diff --git a/.github/workflows/test_lint.yml b/.github/workflows/fmt_test_lint.yml similarity index 93% rename from .github/workflows/test_lint.yml rename to .github/workflows/fmt_test_lint.yml index 0ebed6d7..c396fd62 100644 --- a/.github/workflows/test_lint.yml +++ b/.github/workflows/fmt_test_lint.yml @@ -3,7 +3,7 @@ name: ci on: [ push ] jobs: - test: + fmt_test: runs-on: ubuntu-20.04 container: clojure:openjdk-19-tools-deps-1.11.1.1113-bullseye services: @@ -22,6 +22,8 @@ jobs: RABBITMQ_DEFAULT_PASS: top-gun steps: - uses: actions/checkout@v3 + - name: Check formatting + run: clojure -M:cljfmt check - name: Run tests run: clj -X:test env: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f8d6413c..df7822eb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -28,6 +28,16 @@ $ clj -X:test # Running all tests. $ docker-compose down # ...when you're done. ``` +Formatting +-------- +- Goose adheres to the [weavejester/cljfmt](https://clojars.org/dev.weavejester/cljfmt) library to maintain consistent code formatting. +- Install the appropriate plugin for your IDE to ensure compliance. + +```shell +$ clj -M:cljfmt fix # Automatically fix formatting issues. +$ clj -M:cljfmt check # Check any formatting inconsistencies. +``` + Linting -------- - Install [clj-kondo v2022.10.05](https://github.com/clj-kondo/clj-kondo/blob/master/doc/install.md#installation-script-macos-and-linux) diff --git a/README.md b/README.md index 68b884ed..3e2a02e6 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Features - Pluggable [Message Broker](https://github.com/nilenso/goose/wiki/Guide-to-Message-Broker-Integration) & [Metrics Backend](https://github.com/nilenso/goose/wiki/Guide-to-Custom-Metrics-Backend) - [Scheduled Jobs](https://github.com/nilenso/goose/wiki/Scheduled-Jobs) - [Batch Jobs](https://github.com/nilenso/goose/wiki/Batch-Jobs) -- [Cron Jobs](https://github.com/nilenso/goose/wiki/Cron) +- [Cron Jobs](https://github.com/nilenso/goose/wiki/Cron-Jobs) - [Error Handling & Retries](https://github.com/nilenso/goose/wiki/Error-Handling-&-Retries) - [Console](https://github.com/nilenso/goose/wiki/Console) - Extensible using [Middlewares](https://github.com/nilenso/goose/wiki/Middlewares) @@ -76,7 +76,7 @@ com.nilenso/goose {:mvn/version "0.5.3"} (w/stop worker) ; Performs graceful shutsdown. (rmq/close rmq-consumer)) ``` -Refer to wiki for [Redis](https://github.com/nilenso/goose/wiki/Redis), [Cron](https://github.com/nilenso/goose/wiki/Cron), [Error Handling](https://github.com/nilenso/goose/wiki/Error-Handling-&-Retries), [Monitoring](https://github.com/nilenso/goose/wiki/Monitoring-&-Alerting), [Production Readiness](https://github.com/nilenso/goose/wiki/Production-Readiness), etc. +Refer to wiki for [Redis](https://github.com/nilenso/goose/wiki/Redis), [Cron Jobs](https://github.com/nilenso/goose/wiki/Cron-Jobs), [Error Handling](https://github.com/nilenso/goose/wiki/Error-Handling-&-Retries), [Monitoring](https://github.com/nilenso/goose/wiki/Monitoring-&-Alerting), [Production Readiness](https://github.com/nilenso/goose/wiki/Production-Readiness), etc. Getting Help --------- diff --git a/architecture-decisions/README.md b/architecture-decisions/README.md index a621a743..ece4b388 100644 --- a/architecture-decisions/README.md +++ b/architecture-decisions/README.md @@ -23,7 +23,7 @@ Index 1. [Enqueue-Dequeue](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/enqueue-dequeue.md) 1. [Priority/Customized Queues](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/priority-queues.md) 1. [Scheduled Jobs](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/scheduled-jobs.md) -1. [Cron Jobs](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/periodic-jobs.md) +1. [Cron Jobs](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/cron-jobs.md) 1. [Batch Jobs](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/batch-jobs.md) 1. [Error-handling & Retrying](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/error-handling.md) 1. [API](https://github.com/nilenso/goose/blob/main/architecture-decisions/pages/api.md) diff --git a/architecture-decisions/pages/periodic-jobs.md b/architecture-decisions/pages/cron-jobs.md similarity index 100% rename from architecture-decisions/pages/periodic-jobs.md rename to architecture-decisions/pages/cron-jobs.md diff --git a/deps.edn b/deps.edn index cdf3aed9..c888f3c5 100644 --- a/deps.edn +++ b/deps.edn @@ -21,6 +21,8 @@ :exec-fn test-runner/test-and-shutdown} :repl {:extra-deps {vvvvalvalval/scope-capture {:mvn/version "0.3.2"} org.clojure/tools.namespace {:mvn/version "1.3.0"}}} + :cljfmt {:deps {dev.weavejester/cljfmt {:mvn/version "0.12.0"}} + :main-opts ["-m" "cljfmt.main"]} :redis-perf {:extra-paths ["perf"] :exec-fn goose.redis.load/benchmark :extra-deps {criterium/criterium {:mvn/version "0.4.6"} diff --git a/src/goose/api/batch.clj b/src/goose/api/batch.clj index b0ee9f06..8d6c8647 100644 --- a/src/goose/api/batch.clj +++ b/src/goose/api/batch.clj @@ -1,7 +1,7 @@ (ns goose.api.batch (:require - [goose.broker :as b] - [goose.client])) + [goose.broker :as b] + [goose.client])) (defn status "For given `:batch-id`, reports progress of a batch. diff --git a/src/goose/api/cron_jobs.clj b/src/goose/api/cron_jobs.clj index d72e6b9c..c0b81336 100644 --- a/src/goose/api/cron_jobs.clj +++ b/src/goose/api/cron_jobs.clj @@ -1,12 +1,12 @@ (ns goose.api.cron-jobs - "API to manage Cron Entries.\\ + "API to manage Cron Entries & Cron-Scheduled Jobs.\\ To update a cron entry, call [[goose.client/perform-every]] since it is idempotent. - [API wiki](https://github.com/nilenso/goose/wiki/API)" (:require - [goose.broker :as b])) + [goose.broker :as b])) (defn size - "Returns count of Cron entries." + "Returns count of Cron Jobs." [broker] (b/cron-jobs-size broker)) diff --git a/src/goose/api/dead_jobs.clj b/src/goose/api/dead_jobs.clj index e70aa676..4486ff44 100644 --- a/src/goose/api/dead_jobs.clj +++ b/src/goose/api/dead_jobs.clj @@ -3,7 +3,7 @@ - [API wiki](https://github.com/nilenso/goose/wiki/API)" (:refer-clojure :exclude [pop]) (:require - [goose.broker :as b])) + [goose.broker :as b])) (defn size "Returns count of Dead Jobs." diff --git a/src/goose/api/enqueued_jobs.clj b/src/goose/api/enqueued_jobs.clj index 2b8cf39c..cbfe3787 100644 --- a/src/goose/api/enqueued_jobs.clj +++ b/src/goose/api/enqueued_jobs.clj @@ -2,7 +2,7 @@ "API to manage enqueued jobs. - [API wiki](https://github.com/nilenso/goose/wiki/API)" (:require - [goose.broker :as b])) + [goose.broker :as b])) (defn list-all-queues "Lists all the queues." diff --git a/src/goose/api/scheduled_jobs.clj b/src/goose/api/scheduled_jobs.clj index e07433be..2797ca3b 100644 --- a/src/goose/api/scheduled_jobs.clj +++ b/src/goose/api/scheduled_jobs.clj @@ -2,7 +2,7 @@ "API to manage scheduled jobs. - [API wiki](https://github.com/nilenso/goose/wiki/API)" (:require - [goose.broker :as b])) + [goose.broker :as b])) (defn size "Returns count of Scheduled Jobs." diff --git a/src/goose/batch.clj b/src/goose/batch.clj index 05493d80..6a5de25a 100644 --- a/src/goose/batch.clj +++ b/src/goose/batch.clj @@ -1,9 +1,9 @@ (ns goose.batch (:require - [goose.job :as job] - [goose.utils :as u] + [goose.job :as job] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn construct-args "A utility function to construct a collection of args for batch-jobs." diff --git a/src/goose/broker.clj b/src/goose/broker.clj index 8b7f9c28..f5c7515e 100644 --- a/src/goose/broker.clj +++ b/src/goose/broker.clj @@ -43,7 +43,7 @@ (scheduled-jobs-purge [this] "Purges all the Scheduled Jobs.") ;; cron jobs API - (cron-jobs-size [this] "Returns count of Cron entries.") + (cron-jobs-size [this] "Returns count of Cron Entries.") (cron-jobs-find-by-name [this entry-name] "Finds a Cron Entry by `:name`.") (cron-jobs-delete [this entry-name] "Deletes Cron Entry & Cron-Scheduled Job of given `:name`.") (cron-jobs-purge [this] "Purges all the Cron Entries & Cron-Scheduled Jobs.") diff --git a/src/goose/brokers/redis/api/batch.clj b/src/goose/brokers/redis/api/batch.clj index 8634b8b9..c7198023 100644 --- a/src/goose/brokers/redis/api/batch.clj +++ b/src/goose/brokers/redis/api/batch.clj @@ -1,9 +1,9 @@ (ns ^:no-doc goose.brokers.redis.api.batch (:require - [goose.brokers.redis.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] - [goose.brokers.redis.batch :as batch] - [goose.brokers.redis.commands :as redis-cmds])) + [goose.brokers.redis.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] + [goose.brokers.redis.batch :as batch] + [goose.brokers.redis.commands :as redis-cmds])) (defn status [redis-conn id] (batch/get-batch redis-conn id)) diff --git a/src/goose/brokers/redis/api/dead_jobs.clj b/src/goose/brokers/redis/api/dead_jobs.clj index 9e6e4cc3..5739bf8a 100644 --- a/src/goose/brokers/redis/api/dead_jobs.clj +++ b/src/goose/brokers/redis/api/dead_jobs.clj @@ -1,9 +1,9 @@ (ns ^:no-doc goose.brokers.redis.api.dead-jobs (:refer-clojure :exclude [pop]) (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.job :as job])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.job :as job])) (defn size [redis-conn] (redis-cmds/sorted-set-size redis-conn d/prefixed-dead-queue)) @@ -51,7 +51,7 @@ (defn delete-older-than [redis-conn epoch-ms] (< 0 (redis-cmds/del-from-sorted-set-until - redis-conn d/prefixed-dead-queue epoch-ms))) + redis-conn d/prefixed-dead-queue epoch-ms))) (defn purge [redis-conn] (= 1 (redis-cmds/del-keys redis-conn d/prefixed-dead-queue))) diff --git a/src/goose/brokers/redis/api/enqueued_jobs.clj b/src/goose/brokers/redis/api/enqueued_jobs.clj index 3cd85e74..8f698f1f 100644 --- a/src/goose/brokers/redis/api/enqueued_jobs.clj +++ b/src/goose/brokers/redis/api/enqueued_jobs.clj @@ -1,8 +1,8 @@ (ns ^:no-doc goose.brokers.redis.api.enqueued-jobs (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.job :as job])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.job :as job])) (defn list-all-queues "Lists all the queues" diff --git a/src/goose/brokers/redis/api/scheduled_jobs.clj b/src/goose/brokers/redis/api/scheduled_jobs.clj index a5da764f..bcfdc86a 100644 --- a/src/goose/brokers/redis/api/scheduled_jobs.clj +++ b/src/goose/brokers/redis/api/scheduled_jobs.clj @@ -1,8 +1,8 @@ (ns ^:no-doc goose.brokers.redis.api.scheduled-jobs (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.job :as job])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.job :as job])) (defn size [redis-conn] (redis-cmds/sorted-set-size redis-conn d/prefixed-schedule-queue)) diff --git a/src/goose/brokers/redis/batch.clj b/src/goose/brokers/redis/batch.clj index 8d0baae8..26d6bfec 100644 --- a/src/goose/brokers/redis/batch.clj +++ b/src/goose/brokers/redis/batch.clj @@ -1,15 +1,15 @@ (ns ^:no-doc goose.brokers.redis.batch (:require - [goose.batch :as batch] - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.job :as job] - [goose.metrics :as m] - [goose.retry] - [goose.utils :as u] + [goose.batch :as batch] + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.job :as job] + [goose.metrics :as m] + [goose.retry] + [goose.utils :as u] - [taoensso.carmine :as car] - [clojure.tools.logging :as log])) + [taoensso.carmine :as car] + [clojure.tools.logging :as log])) (defn batch-keys [id] {:batch-hash (d/prefix-batch id) @@ -24,12 +24,12 @@ pruned-batch-state (dissoc batch :jobs) job-ids (map :id jobs)] (redis-cmds/atomic - redis-conn - (car/multi) - (car/hmset* batch-hash pruned-batch-state) - (apply car/sadd enqueued-set job-ids) - (doseq [job jobs] - (car/lpush (:ready-queue job) job))))) + redis-conn + (car/multi) + (car/hmset* batch-hash pruned-batch-state) + (apply car/sadd enqueued-set job-ids) + (doseq [job jobs] + (car/lpush (:ready-queue job) job))))) ;;; When fetching a hash from Redis, primitive data types ;;; like integer, boolean and keyword get converted to string. @@ -37,27 +37,27 @@ (defn- restore-data-types [{:keys [linger-sec total status created-at] :as batch}] (assoc batch - :linger-sec (Long/valueOf linger-sec) - :total (Long/valueOf total) - :status (keyword status) - :created-at (Long/valueOf created-at))) + :linger-sec (Long/valueOf linger-sec) + :total (Long/valueOf total) + :status (keyword status) + :created-at (Long/valueOf created-at))) (defn get-batch [redis-conn id] (let [{:keys [batch-hash enqueued-set retrying-job-set success-set dead-set]} (batch-keys id) [batch enqueued retrying success dead] (redis-cmds/wcar* - redis-conn - (car/parse-map (car/hgetall batch-hash) :keywordize) - (car/scard enqueued-set) - (car/scard retrying-job-set) - (car/scard success-set) - (car/scard dead-set))] + redis-conn + (car/parse-map (car/hgetall batch-hash) :keywordize) + (car/scard enqueued-set) + (car/scard retrying-job-set) + (car/scard success-set) + (car/scard dead-set))] (when (not-empty batch) (assoc (restore-data-types batch) - :enqueued enqueued - :retrying retrying - :success success - :dead dead)))) + :enqueued enqueued + :retrying retrying + :success success + :dead dead)))) (defn- record-metrics [{:keys [metrics-plugin]} @@ -77,17 +77,17 @@ (let [{:keys [batch-hash success-set dead-set]} (batch-keys id) callback (batch/new-callback-job batch id completion-status)] (redis-cmds/atomic - redis-conn - (car/multi) + redis-conn + (car/multi) ;; Enqueue callback to front of queue. - (car/rpush ready-queue callback) + (car/rpush ready-queue callback) ;; Update batch status to reflect completion status. - (car/hset batch-hash :status completion-status) + (car/hset batch-hash :status completion-status) ;; Terminal job execution marks completion of a batch, NOT callback execution. ;; Clean-up batch after enqueuing callback. - (car/expire batch-hash linger-sec "NX") - (car/expire success-set linger-sec "NX") - (car/expire dead-set linger-sec "NX")))) + (car/expire batch-hash linger-sec "NX") + (car/expire success-set linger-sec "NX") + (car/expire dead-set linger-sec "NX")))) (defn- mark-batch-completion [{:keys [redis-conn] :as opts} job batch-id completion-status] @@ -123,13 +123,13 @@ [redis-conn src dst job-id batch-keys status] (let [{:keys [enqueued-set retrying-set success-set dead-set]} batch-keys [_ atomic-results] (redis-cmds/atomic - redis-conn - (car/multi) - (car/smove src dst job-id) - (car/scard enqueued-set) - (car/scard retrying-set) - (car/scard success-set) - (car/scard dead-set)) + redis-conn + (car/multi) + (car/smove src dst job-id) + (car/scard enqueued-set) + (car/scard retrying-set) + (car/scard success-set) + (car/scard dead-set)) [_ enqueued retrying success dead] atomic-results] (reset! status (batch/status-from-job-states enqueued retrying success dead)))) diff --git a/src/goose/brokers/redis/broker.clj b/src/goose/brokers/redis/broker.clj index 736a0443..2628e2e3 100644 --- a/src/goose/brokers/redis/broker.clj +++ b/src/goose/brokers/redis/broker.clj @@ -1,19 +1,19 @@ (ns goose.brokers.redis.broker (:require - [goose.broker :as b] - [goose.brokers.redis.api.batch :as batch-api] - [goose.brokers.redis.api.dead-jobs :as dead-jobs] - [goose.brokers.redis.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] - [goose.brokers.redis.batch :as batch] - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.connection :as redis-connection] - [goose.brokers.redis.cron :as cron] - [goose.brokers.redis.scheduler :as redis-scheduler] - [goose.brokers.redis.worker :as redis-worker] - [goose.brokers.redis.console :as redis-console] - [goose.defaults :as d] - [goose.specs :as specs])) + [goose.broker :as b] + [goose.brokers.redis.api.batch :as batch-api] + [goose.brokers.redis.api.dead-jobs :as dead-jobs] + [goose.brokers.redis.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] + [goose.brokers.redis.batch :as batch] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.connection :as redis-connection] + [goose.brokers.redis.cron :as cron-jobs] + [goose.brokers.redis.scheduler :as redis-scheduler] + [goose.brokers.redis.worker :as redis-worker] + [goose.brokers.redis.console :as redis-console] + [goose.defaults :as d] + [goose.specs :as specs])) (defrecord Redis [redis-conn opts] b/Broker @@ -24,7 +24,7 @@ (schedule [this schedule-epoch-ms job] (redis-scheduler/run-at (:redis-conn this) schedule-epoch-ms job)) (register-cron [this cron-opts job-description] - (cron/register (:redis-conn this) cron-opts job-description)) + (cron-jobs/register (:redis-conn this) cron-opts job-description)) (enqueue-batch [this batch] (batch/enqueue (:redis-conn this) batch) @@ -64,13 +64,13 @@ ;; cron entries API (cron-jobs-size [this] - (cron/size (:redis-conn this))) + (cron-jobs/size (:redis-conn this))) (cron-jobs-find-by-name [this entry-name] - (cron/find-by-name (:redis-conn this) entry-name)) + (cron-jobs/find-by-name (:redis-conn this) entry-name)) (cron-jobs-delete [this entry-name] - (cron/delete (:redis-conn this) entry-name)) + (cron-jobs/delete (:redis-conn this) entry-name)) (cron-jobs-purge [this] - (cron/purge (:redis-conn this))) + (cron-jobs/purge (:redis-conn this))) ;; dead-jobs API (dead-jobs-size [this] @@ -97,7 +97,7 @@ (batch-api/status (:redis-conn this) id)) (batch-delete [this id] (batch-api/delete (:redis-conn this) id)) - + ;; console API (handler [this req] (redis-console/handler this req))) @@ -154,7 +154,7 @@ ([conn-opts scheduler-polling-interval-sec] (specs/assert-redis-consumer conn-opts scheduler-polling-interval-sec) (let [opts (assoc conn-opts - :scheduler-polling-interval-sec scheduler-polling-interval-sec)] + :scheduler-polling-interval-sec scheduler-polling-interval-sec)] ;; Connection to Redis is opened/closed from start/stop functions of worker. ;; This was done to avoid duplication of code & mis-match in `threads` config. (->Redis nil opts)))) diff --git a/src/goose/brokers/redis/commands.clj b/src/goose/brokers/redis/commands.clj index cc4cc230..2b823a57 100644 --- a/src/goose/brokers/redis/commands.clj +++ b/src/goose/brokers/redis/commands.clj @@ -1,9 +1,9 @@ (ns ^:no-doc goose.brokers.redis.commands (:require - [goose.defaults :as d] - [goose.utils :as u] + [goose.defaults :as d] + [goose.utils :as u] - [taoensso.carmine :as car])) + [taoensso.carmine :as car])) (def atomic-lock-attempts 100) (def ^:private initial-scan-cursor 0) @@ -40,11 +40,11 @@ (scan-seq conn scan-fn redis-key initial-scan-cursor)) ([conn scan-fn redis-key cursor] (lazy-seq - (let [[next-cursor-string items] (scan-fn conn redis-key cursor) - next-cursor (ensure-int next-cursor-string)] - (concat items - (when (pos? next-cursor) - (scan-seq conn scan-fn redis-key next-cursor))))))) + (let [[next-cursor-string items] (scan-fn conn redis-key cursor) + next-cursor (ensure-int next-cursor-string)] + (concat items + (when (pos? next-cursor) + (scan-seq conn scan-fn redis-key next-cursor))))))) (defmacro atomic "A simple wrapper over Carmine's `atomic` macro. @@ -52,8 +52,8 @@ need values returned by redis, and not surrounding function." [conn & body] `(car/atomic ~conn - atomic-lock-attempts - ~@body)) + atomic-lock-attempts + ~@body)) (defn run-with-transaction "Runs fn inside a Carmine atomic block, and returns @@ -63,7 +63,7 @@ (car/atomic conn atomic-lock-attempts ;; This ugliness is necessary because car/atomic does not ;; return the value of the last expression inside it. - (reset! return-value (f))) + (reset! return-value (f))) @return-value)) (defmacro with-transaction @@ -221,20 +221,20 @@ (let [limit "limit" offset 0] (not-empty - (wcar* - conn - (car/zrangebyscore - sorted-set sorted-set-min (u/epoch-time-ms) - limit offset d/redis-scheduled-jobs-pop-limit))))) + (wcar* + conn + (car/zrangebyscore + sorted-set sorted-set-min (u/epoch-time-ms) + limit offset d/redis-scheduled-jobs-pop-limit))))) (defn sorted-set->ready-queue [conn sorted-set jobs grouping-fn] (car/atomic - conn atomic-lock-attempts - (car/multi) - (doseq [[queue jobs] (group-by grouping-fn jobs)] - (apply car/rpush queue jobs)) - (apply car/zrem sorted-set jobs))) + conn atomic-lock-attempts + (car/multi) + (doseq [[queue jobs] (group-by grouping-fn jobs)] + (apply car/rpush queue jobs)) + (apply car/zrem sorted-set jobs))) (defn sorted-set-scores [conn sorted-set & elements] (wcar* conn (apply car/zmscore sorted-set elements))) diff --git a/src/goose/brokers/redis/console/data.clj b/src/goose/brokers/redis/console/data.clj index bb25b35d..351e582f 100644 --- a/src/goose/brokers/redis/console/data.clj +++ b/src/goose/brokers/redis/console/data.clj @@ -2,7 +2,7 @@ (:require [goose.brokers.redis.api.dead-jobs :as dead-jobs] [goose.brokers.redis.api.enqueued-jobs :as enqueued-jobs] [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] - [goose.brokers.redis.cron :as cron] + [goose.brokers.redis.cron :as cron-jobs] [goose.defaults :as d] [goose.job :as job])) @@ -20,7 +20,7 @@ enqueued (reduce (fn [total queue] (+ total (enqueued-jobs/size redis-conn queue))) 0 queues) scheduled (scheduled-jobs/size redis-conn) - cron (cron/size redis-conn) + cron (cron-jobs/size redis-conn) dead (dead-jobs/size redis-conn)] {:enqueued enqueued :scheduled scheduled @@ -69,7 +69,7 @@ queue (* (dec page) d/page-size) (dec (* page d/page-size))) - :total-jobs (enqueued-jobs/size redis-conn queue)) + :total-jobs (enqueued-jobs/size redis-conn queue)) (invalid-filter-value? validated-filter-value) no-jobs-response))) @@ -101,7 +101,7 @@ (assoc base-result :jobs (dead-jobs/get-by-range redis-conn (* d/page-size (dec page)) (dec (* d/page-size page))) - :total-jobs (dead-jobs/size redis-conn)) + :total-jobs (dead-jobs/size redis-conn)) (invalid-filter-value? validated-filter-value) (assoc base-result :jobs [])))) @@ -139,9 +139,9 @@ (get-all-jobs-request? validated-filter-type validated-filter-value) (assoc base-result :total-jobs (scheduled-jobs/size redis-conn) - :jobs (scheduled-jobs/get-by-range redis-conn - (* (dec page) d/page-size) - (dec (* page d/page-size)))) + :jobs (scheduled-jobs/get-by-range redis-conn + (* (dec page) d/page-size) + (dec (* page d/page-size)))) (invalid-filter-value? validated-filter-value) (assoc base-result :jobs [])))) @@ -151,13 +151,13 @@ validated-filter-value :filter-value}] (cond (filter-jobs-request? validated-filter-type validated-filter-value) - {:jobs (if-let [job (cron/find-by-name redis-conn validated-filter-value)] + {:jobs (if-let [job (cron-jobs/find-by-name redis-conn validated-filter-value)] [job] [])} (get-all-jobs-request? validated-filter-type validated-filter-value) - {:total-jobs (cron/size redis-conn) - :jobs (cron/get-all redis-conn)} + {:total-jobs (cron-jobs/size redis-conn) + :jobs (cron-jobs/get-all redis-conn)} (invalid-filter-value? validated-filter-value) {:jobs []})) diff --git a/src/goose/brokers/redis/console/pages/batch.clj b/src/goose/brokers/redis/console/pages/batch.clj index 35ff65b5..f7bbae64 100644 --- a/src/goose/brokers/redis/console/pages/batch.clj +++ b/src/goose/brokers/redis/console/pages/batch.clj @@ -6,7 +6,7 @@ [goose.utils :as utils] [ring.util.response :as response]) (:import - (java.util Date))) + (java.util Date))) (defn- batch-job-table [{:keys [id callback-fn-sym @@ -71,9 +71,9 @@ :type "hidden" :value (utils/encode-to-str job)}] (c/action-btns [(c/delete-btn - [:div "Are you sure you want to delete " - [:span.highlight (get job :total)] " job/s?"] - {:disabled false})]) + [:div "Are you sure you want to delete " + [:span.highlight (get job :total)] " job/s?"] + {:disabled false})]) (batch-job-table job)]]]) (defn- filter-header [{:keys [base-path job-type] diff --git a/src/goose/brokers/redis/console/pages/components.clj b/src/goose/brokers/redis/console/pages/components.clj index 9de2dc09..6cbe2407 100644 --- a/src/goose/brokers/redis/console/pages/components.clj +++ b/src/goose/brokers/redis/console/pages/components.clj @@ -55,7 +55,7 @@ (defn filter-header [filter-types {:keys [job-type base-path] {:keys [filter-type filter-value limit]} :params}] (let [filter-values-for-type {:enqueued ["unexecuted" "failed"] - :scheduled ["scheduled" "failed"]} ] + :scheduled ["scheduled" "failed"]}] [:div.header [:form.filter-opts {:action base-path :method "get"} diff --git a/src/goose/brokers/redis/console/pages/cron.clj b/src/goose/brokers/redis/console/pages/cron.clj index 2d416d78..54f19133 100644 --- a/src/goose/brokers/redis/console/pages/cron.clj +++ b/src/goose/brokers/redis/console/pages/cron.clj @@ -13,7 +13,7 @@ :method "post"} [:div.padding-top (c/action-btns [(c/delete-btn - [:div "Are you sure you want to delete selected jobs?"])])] + [:div "Are you sure you want to delete selected jobs?"])])] [:table.jobs-table [:thead [:th.name "Cron name"] @@ -34,7 +34,7 @@ cron-schedule [:span.tooltip-text [:div.tooltip-content - (CronExpressionDescriptor/getDescription cron-schedule)]]]] + (CronExpressionDescriptor/getDescription cron-schedule)]]]] [:td [:div.timezone timezone]] [:td [:div.queue] queue] [:td [:div.execute-fn-sym (str execute-fn-sym)]] @@ -130,8 +130,8 @@ :method "post"} [:div (c/action-btns [(c/delete-btn - "Are you sure you want to delete the job?" - {:disabled false})]) + "Are you sure you want to delete the job?" + {:disabled false})]) [:input {:name "cron-name" :type "hidden" :value cron-name}] @@ -151,10 +151,10 @@ validated-params (validate-get-jobs params) data (data/cron-page-data redis-conn validated-params)] (response/response (view "Cron" (assoc data :app-name app-name - :job-type :cron - :base-path (prefix-route "/cron") - :prefix-route prefix-route - :params params))))) + :job-type :cron + :base-path (prefix-route "/cron") + :prefix-route prefix-route + :params params))))) (defn delete-jobs [{{{:keys [redis-conn]} :broker} :console-opts :keys [prefix-route] diff --git a/src/goose/brokers/redis/console/pages/dead.clj b/src/goose/brokers/redis/console/pages/dead.clj index 25fd4f4c..cdaae23d 100644 --- a/src/goose/brokers/redis/console/pages/dead.clj +++ b/src/goose/brokers/redis/console/pages/dead.clj @@ -9,7 +9,7 @@ [goose.utils :as utils] [ring.util.response :as response]) (:import - (java.util Date))) + (java.util Date))) (defn validate-get-jobs [{:keys [page filter-type limit filter-value]}] (let [page (specs/validate-or-default ::specs/page @@ -38,7 +38,7 @@ :method "post"} (c/action-btns [(c/replay-btn) (c/delete-btn - [:div "Are you sure you want to delete selected jobs?"])]) + [:div "Are you sure you want to delete selected jobs?"])]) [:table.jobs-table [:thead [:tr @@ -91,8 +91,8 @@ [:div (c/action-btns [(c/replay-btn {:disabled false}) (c/delete-btn - "Are you sure you want to delete the job?" - {:disabled false})]) + "Are you sure you want to delete the job?" + {:disabled false})]) [:input {:name "job" :type "hidden" :value (utils/encode-to-str job)}] @@ -107,10 +107,10 @@ validated-params (validate-get-jobs params) data (data/dead-page-data (:redis-conn broker) validated-params)] (response/response (view "Dead" (assoc data :params params - :job-type :dead - :base-path (prefix-route "/dead") - :app-name app-name - :prefix-route prefix-route))))) + :job-type :dead + :base-path (prefix-route "/dead") + :app-name app-name + :prefix-route prefix-route))))) (defn purge-queue [{{{:keys [redis-conn]} :broker} :console-opts :keys [prefix-route]}] diff --git a/src/goose/brokers/redis/console/pages/enqueued.clj b/src/goose/brokers/redis/console/pages/enqueued.clj index 2ee85aad..3ef62ebb 100644 --- a/src/goose/brokers/redis/console/pages/enqueued.clj +++ b/src/goose/brokers/redis/console/pages/enqueued.clj @@ -9,7 +9,7 @@ [goose.utils :as utils] [ring.util.response :as response]) (:import - (java.util Date))) + (java.util Date))) (defn- sidebar [{:keys [prefix-route queues queue]}] [:div#sidebar @@ -26,7 +26,7 @@ :method "post"} (c/action-btns [(c/prioritise-btn) (c/delete-btn - [:div "Are you sure you want to delete selected jobs in " [:span.highlight queue] " queue?"])]) + [:div "Are you sure you want to delete selected jobs in " [:span.highlight queue] " queue?"])]) [:table.jobs-table [:thead [:tr @@ -62,8 +62,8 @@ [:div (c/action-btns [(c/prioritise-btn {:disabled false}) (c/delete-btn - "Are you sure you want to delete the job?" - {:disabled false})]) + "Are you sure you want to delete the job?" + {:disabled false})]) [:input {:name "job" :type "hidden" :value (utils/encode-to-str job)}] @@ -121,9 +121,9 @@ {:keys [id queue]} (specs/validate-req-params params)] (if id (response/response (view "Enqueued" (-> {:job (enqueued-jobs/find-by-id - redis-conn - queue - id)} + redis-conn + queue + id)} (assoc :job-type :enqueued :base-path (prefix-route "/enqueued/queue/" queue) :app-name app-name @@ -137,12 +137,12 @@ validated-params (validate-get-jobs params) {:keys [queue] :as data} (data/enqueued-page-data (:redis-conn broker) validated-params)] (response/response (view "Enqueued" (assoc data :job-type :enqueued - :base-path (if-let [q queue] - (prefix-route "/enqueued/queue/" q) - (prefix-route "/enqueued")) - :params params - :app-name app-name - :prefix-route prefix-route))))) + :base-path (if-let [q queue] + (prefix-route "/enqueued/queue/" q) + (prefix-route "/enqueued")) + :params params + :app-name app-name + :prefix-route prefix-route))))) (defn purge-queue [{{:keys [broker]} :console-opts params :params diff --git a/src/goose/brokers/redis/console/pages/home.clj b/src/goose/brokers/redis/console/pages/home.clj index e14b7362..a53d0d90 100644 --- a/src/goose/brokers/redis/console/pages/home.clj +++ b/src/goose/brokers/redis/console/pages/home.clj @@ -21,5 +21,5 @@ (let [view (console/layout c/header stats-bar) data (data/jobs-size (:redis-conn broker))] (response/response (view "Home" (assoc data :uri uri - :app-name app-name - :prefix-route prefix-route))))) + :app-name app-name + :prefix-route prefix-route))))) diff --git a/src/goose/brokers/redis/console/pages/scheduled.clj b/src/goose/brokers/redis/console/pages/scheduled.clj index 2472f600..ea8123e3 100644 --- a/src/goose/brokers/redis/console/pages/scheduled.clj +++ b/src/goose/brokers/redis/console/pages/scheduled.clj @@ -9,14 +9,14 @@ [goose.utils :as utils] [ring.util.response :as response]) (:import - (java.util Date))) + (java.util Date))) (defn jobs-table [{:keys [base-path jobs]}] [:form {:action (str base-path "/jobs") :method "post"} (c/action-btns [(c/prioritise-btn) (c/delete-btn - [:div "Are you sure you want to delete selected jobs?"])]) + [:div "Are you sure you want to delete selected jobs?"])]) [:table.jobs-table [:thead [:th.when-h [:div.when-label "When"] @@ -77,8 +77,8 @@ [:div (c/action-btns [(c/prioritise-btn {:disabled false}) (c/delete-btn - "Are you sure you want to delete the job?" - {:disabled false})]) + "Are you sure you want to delete the job?" + {:disabled false})]) [:input {:name "job" :type "hidden" :value (utils/encode-to-str job)}] @@ -115,10 +115,10 @@ validated-params (validate-get-jobs params) data (data/scheduled-page-data redis-conn validated-params)] (response/response (view "Scheduled" (assoc data :app-name app-name - :job-type :scheduled - :base-path (prefix-route "/scheduled") - :prefix-route prefix-route - :params params))))) + :job-type :scheduled + :base-path (prefix-route "/scheduled") + :prefix-route prefix-route + :params params))))) (defn purge-queue [{{{:keys [redis-conn]} :broker} :console-opts :keys [prefix-route]}] diff --git a/src/goose/brokers/redis/console/specs.clj b/src/goose/brokers/redis/console/specs.clj index c048463c..a128c928 100644 --- a/src/goose/brokers/redis/console/specs.clj +++ b/src/goose/brokers/redis/console/specs.clj @@ -1,7 +1,7 @@ (ns ^:no-doc goose.brokers.redis.console.specs (:require [clojure.spec.alpha :as s]) (:import - (java.lang Long))) + (java.lang Long))) (s/def ::page pos-int?) (s/def ::queue string?) diff --git a/src/goose/brokers/redis/consumer.clj b/src/goose/brokers/redis/consumer.clj index 3b0a9fdc..a590041d 100644 --- a/src/goose/brokers/redis/consumer.clj +++ b/src/goose/brokers/redis/consumer.clj @@ -1,10 +1,10 @@ (ns ^:no-doc goose.brokers.redis.consumer (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.utils :as u] + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn preservation-queue [id] (str d/in-progress-queue-prefix id)) @@ -14,8 +14,8 @@ :as opts}] (log/debug "Long-polling broker...") (u/while-pool - thread-pool - (u/log-on-exceptions - (when-let [job (redis-cmds/dequeue-and-preserve redis-conn ready-queue in-progress-queue)] - (call opts job) - (redis-cmds/del-from-list redis-conn in-progress-queue job))))) + thread-pool + (u/log-on-exceptions + (when-let [job (redis-cmds/dequeue-and-preserve redis-conn ready-queue in-progress-queue)] + (call opts job) + (redis-cmds/del-from-list redis-conn in-progress-queue job))))) diff --git a/src/goose/brokers/redis/cron.clj b/src/goose/brokers/redis/cron.clj index bdbb7b1f..d0874447 100644 --- a/src/goose/brokers/redis/cron.clj +++ b/src/goose/brokers/redis/cron.clj @@ -1,14 +1,14 @@ (ns ^:no-doc goose.brokers.redis.cron (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.cron.parsing :as cron-parsing] - [goose.defaults :as d] - [goose.job :as j] - [goose.utils :as u] + [goose.brokers.redis.commands :as redis-cmds] + [goose.cron.parsing :as cron-parsing] + [goose.defaults :as d] + [goose.job :as j] + [goose.utils :as u] - [taoensso.carmine :as car]) + [taoensso.carmine :as car]) (:import - (java.time ZoneId))) + (java.time ZoneId))) (defn registry-entry [{:keys [cron-name cron-schedule timezone] @@ -57,12 +57,12 @@ (defn- due-cron-names [redis-conn] (redis-cmds/wcar* redis-conn - (car/zrangebyscore d/prefixed-cron-queue - redis-cmds/sorted-set-min - (u/epoch-time-ms) - "limit" - 0 - d/redis-cron-names-pop-limit))) + (car/zrangebyscore d/prefixed-cron-queue + redis-cmds/sorted-set-min + (u/epoch-time-ms) + "limit" + 0 + d/redis-cron-names-pop-limit))) (defn- ensure-sequential [thing] @@ -77,8 +77,8 @@ ;; ensure-sequential is necessary because if there's only one cron name, ;; wcar* will return nil or a single item instead of an empty/singleton list. (ensure-sequential - (redis-cmds/wcar* redis-conn - (doall (map #(car/hget d/prefixed-cron-entries %) cron-names)))))) + (redis-cmds/wcar* redis-conn + (doall (map #(car/hget d/prefixed-cron-entries %) cron-names)))))) (defn- create-job [{:keys [cron-schedule timezone job-description] @@ -106,10 +106,10 @@ [redis-conn & entry-names] (let [count-entry-names (count entry-names) [_ atomic-results] (redis-cmds/atomic - redis-conn - (car/multi) - (apply car/zrem d/prefixed-cron-queue entry-names) - (apply car/hdel d/prefixed-cron-entries entry-names))] + redis-conn + (car/multi) + (apply car/zrem d/prefixed-cron-queue entry-names) + (apply car/hdel d/prefixed-cron-entries entry-names))] (= [count-entry-names count-entry-names] atomic-results))) (defn get-all [redis-conn] diff --git a/src/goose/brokers/redis/heartbeat.clj b/src/goose/brokers/redis/heartbeat.clj index 7f7c4cd7..3752e633 100644 --- a/src/goose/brokers/redis/heartbeat.clj +++ b/src/goose/brokers/redis/heartbeat.clj @@ -1,8 +1,8 @@ (ns ^:no-doc goose.brokers.redis.heartbeat (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.utils :as u])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.utils :as u])) (defn heartbeat-id [id] (str d/heartbeat-prefix id)) @@ -23,14 +23,14 @@ [{:keys [internal-thread-pool id redis-conn process-set graceful-shutdown-sec]}] (redis-cmds/add-to-set redis-conn process-set id) (u/log-on-exceptions - (u/while-pool - internal-thread-pool + (u/while-pool + internal-thread-pool ;; Goose stops sending heartbeat when shutdown is initialized. ;; Set expiry beyond graceful-shutdown time so in-progress jobs ;; aren't considered abandoned and double executions are avoided. - (let [expiry (max d/redis-heartbeat-expire-sec graceful-shutdown-sec)] - (redis-cmds/set-key-val redis-conn (heartbeat-id id) "alive" expiry)) - (u/sleep d/redis-heartbeat-sleep-sec)))) + (let [expiry (max d/redis-heartbeat-expire-sec graceful-shutdown-sec)] + (redis-cmds/set-key-val redis-conn (heartbeat-id id) "alive" expiry)) + (u/sleep d/redis-heartbeat-sleep-sec)))) (defn stop [{:keys [id redis-conn process-set in-progress-queue]}] diff --git a/src/goose/brokers/redis/metrics.clj b/src/goose/brokers/redis/metrics.clj index a10a12e3..28aa1b62 100644 --- a/src/goose/brokers/redis/metrics.clj +++ b/src/goose/brokers/redis/metrics.clj @@ -1,20 +1,22 @@ (ns ^:no-doc goose.brokers.redis.metrics (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.cron :as cron] - [goose.brokers.redis.heartbeat :as heartbeat] - [goose.defaults :as d] - [goose.metrics :as m] - [goose.utils :as u] + [goose.brokers.redis.api.dead-jobs :as dead-jobs] + [goose.brokers.redis.api.scheduled-jobs :as scheduled-jobs] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.cron :as cron-jobs] + [goose.brokers.redis.heartbeat :as heartbeat] + [goose.defaults :as d] + [goose.metrics :as m] + [goose.utils :as u] - [clj-statsd])) + [clj-statsd])) (defn- queue->count [redis-conn queues] (map - (fn [queue] - [(m/format-queue-count queue) (redis-cmds/list-size redis-conn queue)]) - queues)) + (fn [queue] + [(m/format-queue-count queue) (redis-cmds/list-size redis-conn queue)]) + queues)) (defn- enqueued-jobs->count [redis-conn] @@ -26,9 +28,9 @@ (defn- protected-queue-jobs->count [redis-conn] - {m/schedule-jobs-count (redis-cmds/sorted-set-size redis-conn d/prefixed-schedule-queue) - m/dead-jobs-count (redis-cmds/sorted-set-size redis-conn d/prefixed-dead-queue) - m/cron-count (cron/size redis-conn)}) + {m/schedule-jobs-count (scheduled-jobs/size redis-conn) + m/dead-jobs-count (dead-jobs/size redis-conn) + m/cron-jobs-count (cron-jobs/size redis-conn)}) (defn- batches->count [redis-conn] @@ -44,13 +46,13 @@ (defn run [{:keys [internal-thread-pool redis-conn metrics-plugin]}] (u/log-on-exceptions - (when (m/enabled? metrics-plugin) - (u/while-pool - internal-thread-pool + (when (m/enabled? metrics-plugin) + (u/while-pool + internal-thread-pool ;; Using doseq instead of map, because map is lazy. - (doseq [[k v] (all-jobs->count redis-conn)] - (m/gauge metrics-plugin k v {})) - (let [global-workers-count (heartbeat/global-workers-count redis-conn)] + (doseq [[k v] (all-jobs->count redis-conn)] + (m/gauge metrics-plugin k v {})) + (let [global-workers-count (heartbeat/global-workers-count redis-conn)] ;; Sleep for (global-workers-count) minutes + jitters. ;; On average, Goose sends queue level stats every 1 minute. - (u/sleep 60 global-workers-count)))))) + (u/sleep 60 global-workers-count)))))) diff --git a/src/goose/brokers/redis/orphan_checker.clj b/src/goose/brokers/redis/orphan_checker.clj index 77084f88..24f3203b 100644 --- a/src/goose/brokers/redis/orphan_checker.clj +++ b/src/goose/brokers/redis/orphan_checker.clj @@ -1,10 +1,10 @@ (ns ^:no-doc goose.brokers.redis.orphan-checker (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.consumer :as redis-consumer] - [goose.brokers.redis.heartbeat :as heartbeat] - [goose.metrics :as m] - [goose.utils :as u])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.consumer :as redis-consumer] + [goose.brokers.redis.heartbeat :as heartbeat] + [goose.metrics :as m] + [goose.utils :as u])) (defn- replay-orphan-jobs [{:keys [redis-conn ready-queue metrics-plugin] :as opts} @@ -20,21 +20,21 @@ (doseq [process processes] (when-not (heartbeat/alive? redis-conn process) (trampoline - replay-orphan-jobs - opts (redis-consumer/preservation-queue process)) + replay-orphan-jobs + opts (redis-consumer/preservation-queue process)) (redis-cmds/del-from-set redis-conn process-set process)))) (defn run [{:keys [id internal-thread-pool redis-conn process-set] :as opts}] (u/log-on-exceptions - (u/while-pool - internal-thread-pool - (let [processes (redis-cmds/find-in-set redis-conn process-set identity)] - (check-liveness opts (remove #{id} processes))) - (let [local-workers-count (heartbeat/local-workers-count redis-conn process-set)] + (u/while-pool + internal-thread-pool + (let [processes (redis-cmds/find-in-set redis-conn process-set identity)] + (check-liveness opts (remove #{id} processes))) + (let [local-workers-count (heartbeat/local-workers-count redis-conn process-set)] ;; Scheduler & metrics runners derive sleep time from global-workers-count. ;; Orphan checker only recovers jobs from its ready queue; ;; hence it takes local-workers-count into account for sleeping. ;; Sleep for (local-workers-count) minutes + jitters. ;; On average, Goose checks for orphan jobs every 1 minute. - (u/sleep 60 local-workers-count))))) + (u/sleep 60 local-workers-count))))) diff --git a/src/goose/brokers/redis/retry.clj b/src/goose/brokers/redis/retry.clj index dcbd31ab..646a5455 100644 --- a/src/goose/brokers/redis/retry.clj +++ b/src/goose/brokers/redis/retry.clj @@ -1,9 +1,9 @@ (ns ^:no-doc goose.brokers.redis.retry (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.defaults :as d] - [goose.retry] - [goose.utils :as u])) + [goose.brokers.redis.commands :as redis-cmds] + [goose.defaults :as d] + [goose.retry] + [goose.utils :as u])) (defn- retry-job [{:keys [redis-conn error-service-config] diff --git a/src/goose/brokers/redis/scheduler.clj b/src/goose/brokers/redis/scheduler.clj index 2d04d75c..8e0b5e9e 100644 --- a/src/goose/brokers/redis/scheduler.clj +++ b/src/goose/brokers/redis/scheduler.clj @@ -1,13 +1,13 @@ (ns ^:no-doc goose.brokers.redis.scheduler (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.cron :as cron] - [goose.brokers.redis.heartbeat :as heartbeat] - [goose.defaults :as d] - [goose.job :as job] - [goose.utils :as u] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.cron :as cron] + [goose.brokers.redis.heartbeat :as heartbeat] + [goose.defaults :as d] + [goose.job :as job] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn run-at [redis-conn @@ -24,25 +24,25 @@ [redis-conn] (when-let [due-scheduled-jobs (redis-cmds/scheduled-jobs-due-now redis-conn d/prefixed-schedule-queue)] (redis-cmds/sorted-set->ready-queue - redis-conn - d/prefixed-schedule-queue - due-scheduled-jobs - job/ready-or-retry-queue) + redis-conn + d/prefixed-schedule-queue + due-scheduled-jobs + job/ready-or-retry-queue) true)) (defn run [{:keys [internal-thread-pool redis-conn scheduler-polling-interval-sec]}] (log/info "Polling scheduled jobs...") (u/log-on-exceptions - (u/while-pool - internal-thread-pool - (let [scheduled-jobs-found? (enqueue-due-scheduled-jobs redis-conn) - cron-entries-found? (cron/enqueue-due-cron-entries redis-conn)] - (when-not (or scheduled-jobs-found? - cron-entries-found?) + (u/while-pool + internal-thread-pool + (let [scheduled-jobs-found? (enqueue-due-scheduled-jobs redis-conn) + cron-entries-found? (cron/enqueue-due-cron-entries redis-conn)] + (when-not (or scheduled-jobs-found? + cron-entries-found?) ;; Goose only sleeps if no due jobs or cron entries are found. ;; If they are found, then Goose immediately polls to check ;; if more jobs are due. - (let [global-workers-count (heartbeat/global-workers-count redis-conn)] - (u/sleep scheduler-polling-interval-sec global-workers-count)))))) + (let [global-workers-count (heartbeat/global-workers-count redis-conn)] + (u/sleep scheduler-polling-interval-sec global-workers-count)))))) (log/info "Stopped scheduler. Exiting gracefully...")) diff --git a/src/goose/brokers/redis/worker.clj b/src/goose/brokers/redis/worker.clj index 8481b275..76d04305 100644 --- a/src/goose/brokers/redis/worker.clj +++ b/src/goose/brokers/redis/worker.clj @@ -1,23 +1,23 @@ (ns ^:no-doc goose.brokers.redis.worker (:require - [goose.brokers.redis.connection :as redis-connection] - [goose.brokers.redis.consumer :as redis-consumer] - [goose.brokers.redis.heartbeat :as redis-heartbeat] - [goose.brokers.redis.metrics :as redis-metrics] - [goose.brokers.redis.orphan-checker :as redis-orphan-checker] - [goose.brokers.redis.retry :as redis-retry] - [goose.brokers.redis.scheduler :as redis-scheduler] - [goose.consumer :as consumer] - [goose.defaults :as d] - [goose.job :as job] - [goose.metrics :as m] - [goose.utils :as u] - [goose.brokers.redis.batch :as batch] + [goose.brokers.redis.connection :as redis-connection] + [goose.brokers.redis.consumer :as redis-consumer] + [goose.brokers.redis.heartbeat :as redis-heartbeat] + [goose.brokers.redis.metrics :as redis-metrics] + [goose.brokers.redis.orphan-checker :as redis-orphan-checker] + [goose.brokers.redis.retry :as redis-retry] + [goose.brokers.redis.scheduler :as redis-scheduler] + [goose.consumer :as consumer] + [goose.defaults :as d] + [goose.job :as job] + [goose.metrics :as m] + [goose.utils :as u] + [goose.brokers.redis.batch :as batch] - [clojure.tools.logging :as log] - [com.climate.claypoole :as cp]) + [clojure.tools.logging :as log] + [com.climate.claypoole :as cp]) (:import - [java.util.concurrent TimeUnit])) + [java.util.concurrent TimeUnit])) (defn- internal-stop "Gracefully shuts down the worker threadpool." diff --git a/src/goose/brokers/rmq/api/dead_jobs.clj b/src/goose/brokers/rmq/api/dead_jobs.clj index c17936a9..a65d258f 100644 --- a/src/goose/brokers/rmq/api/dead_jobs.clj +++ b/src/goose/brokers/rmq/api/dead_jobs.clj @@ -1,12 +1,12 @@ (ns ^:no-doc goose.brokers.rmq.api.dead-jobs (:refer-clojure :exclude [pop]) (:require - [goose.brokers.rmq.commands :as rmq-cmds] - [goose.defaults :as d] - [goose.utils :as u] + [goose.brokers.rmq.commands :as rmq-cmds] + [goose.defaults :as d] + [goose.utils :as u] - [langohr.basic :as lb] - [langohr.queue :as lq])) + [langohr.basic :as lb] + [langohr.queue :as lq])) (defn size [ch] (lq/message-count ch d/prefixed-dead-queue)) diff --git a/src/goose/brokers/rmq/api/enqueued_jobs.clj b/src/goose/brokers/rmq/api/enqueued_jobs.clj index af63c5ac..fe8807ee 100644 --- a/src/goose/brokers/rmq/api/enqueued_jobs.clj +++ b/src/goose/brokers/rmq/api/enqueued_jobs.clj @@ -1,8 +1,8 @@ (ns ^:no-doc goose.brokers.rmq.api.enqueued-jobs (:require - [goose.defaults :as d] + [goose.defaults :as d] - [langohr.queue :as lq])) + [langohr.queue :as lq])) (defn size [ch queue] diff --git a/src/goose/brokers/rmq/broker.clj b/src/goose/brokers/rmq/broker.clj index a87791c5..2ba374c6 100644 --- a/src/goose/brokers/rmq/broker.clj +++ b/src/goose/brokers/rmq/broker.clj @@ -1,21 +1,21 @@ (ns goose.brokers.rmq.broker (:require - [goose.broker :as b] - [goose.brokers.rmq.api.dead-jobs :as dead-jobs] - [goose.brokers.rmq.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.rmq.commands :as rmq-cmds] - [goose.brokers.rmq.connection :as rmq-connection] - [goose.brokers.rmq.publisher-confirms :as publisher-confirms] - [goose.brokers.rmq.queue :as rmq-queue] - [goose.brokers.rmq.return-listener :as return-listener] - [goose.brokers.rmq.scheduler :as rmq-scheduler] - [goose.brokers.rmq.shutdown-listener :as shutdown-listener] - [goose.brokers.rmq.worker :as rmq-worker] - [goose.brokers.rmq.console :as console] - [goose.defaults :as d] - [goose.job :as job] - [goose.specs :as specs] - [goose.utils :as u])) + [goose.broker :as b] + [goose.brokers.rmq.api.dead-jobs :as dead-jobs] + [goose.brokers.rmq.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.rmq.commands :as rmq-cmds] + [goose.brokers.rmq.connection :as rmq-connection] + [goose.brokers.rmq.publisher-confirms :as publisher-confirms] + [goose.brokers.rmq.queue :as rmq-queue] + [goose.brokers.rmq.return-listener :as return-listener] + [goose.brokers.rmq.scheduler :as rmq-scheduler] + [goose.brokers.rmq.shutdown-listener :as shutdown-listener] + [goose.brokers.rmq.worker :as rmq-worker] + [goose.brokers.rmq.console :as console] + [goose.defaults :as d] + [goose.job :as job] + [goose.specs :as specs] + [goose.utils :as u])) (defprotocol Close "Closes connection to RabbitMQ Message Broker." diff --git a/src/goose/brokers/rmq/channel.clj b/src/goose/brokers/rmq/channel.clj index 589671ba..cb0b3382 100644 --- a/src/goose/brokers/rmq/channel.clj +++ b/src/goose/brokers/rmq/channel.clj @@ -1,12 +1,11 @@ (ns ^:no-doc goose.brokers.rmq.channel (:require - [goose.brokers.rmq.return-listener :as return-listener] - [goose.defaults :as d] - - [langohr.basic :as lb] - [langohr.channel :as lch] - [langohr.confirm :as lcnf])) + [goose.brokers.rmq.return-listener :as return-listener] + [goose.defaults :as d] + [langohr.basic :as lb] + [langohr.channel :as lch] + [langohr.confirm :as lcnf])) (defn open [conn diff --git a/src/goose/brokers/rmq/commands.clj b/src/goose/brokers/rmq/commands.clj index c0335fbe..a634cdad 100644 --- a/src/goose/brokers/rmq/commands.clj +++ b/src/goose/brokers/rmq/commands.clj @@ -1,15 +1,15 @@ (ns ^:no-doc goose.brokers.rmq.commands (:require - [goose.brokers.rmq.queue :as rmq-queue] - [goose.defaults :as d] - [goose.job :as job] - [goose.utils :as u] + [goose.brokers.rmq.queue :as rmq-queue] + [goose.defaults :as d] + [goose.job :as job] + [goose.utils :as u] - [clojure.tools.logging :as log] - [langohr.basic :as lb] - [langohr.confirm :as lcnf]) + [clojure.tools.logging :as log] + [langohr.basic :as lb] + [langohr.confirm :as lcnf]) (:import - (java.io IOException))) + (java.io IOException))) (defn- publish [ch exch queue job {:keys [mandatory priority headers]}] diff --git a/src/goose/brokers/rmq/connection.clj b/src/goose/brokers/rmq/connection.clj index df01aa5b..d07d9085 100644 --- a/src/goose/brokers/rmq/connection.clj +++ b/src/goose/brokers/rmq/connection.clj @@ -1,8 +1,8 @@ (ns ^:no-doc goose.brokers.rmq.connection (:require - [goose.brokers.rmq.channel :as rmq-channel] + [goose.brokers.rmq.channel :as rmq-channel] - [langohr.core :as lcore])) + [langohr.core :as lcore])) (defn open [{:keys [settings publisher-confirms return-listener shutdown-listener]} diff --git a/src/goose/brokers/rmq/console.clj b/src/goose/brokers/rmq/console.clj index acabe47d..acfba342 100644 --- a/src/goose/brokers/rmq/console.clj +++ b/src/goose/brokers/rmq/console.clj @@ -32,7 +32,7 @@ _ (declare-dead-queue broker) data {:dead (dead-jobs/size (u/random-element (:channels broker)))}] (response/response (view "Home" (assoc data :app-name app-name - :prefix-route prefix-route))))) + :prefix-route prefix-route))))) (defn job-page [{:keys [base-path total-jobs job replay-job-count] :as data}] [:div.rmq {:id "page"} @@ -108,9 +108,9 @@ :job (dead-jobs/pop (u/random-element (:channels broker)))} {:total-jobs 0})] (response/response (view "Dead" (assoc response :job-type :dead - :base-path (prefix-route "/dead") - :app-name app-name - :prefix-route prefix-route))))) + :base-path (prefix-route "/dead") + :app-name app-name + :prefix-route prefix-route))))) (defn- routes [route-prefix] [route-prefix [["" console/redirect-to-home-page] @@ -131,9 +131,9 @@ route-params :route-params} (-> route-prefix routes (bidi/match-route - uri - {:request-method - request-method}))] + uri + {:request-method + request-method}))] (-> req (update :params merge route-params) page-handler))) diff --git a/src/goose/brokers/rmq/consumer.clj b/src/goose/brokers/rmq/consumer.clj index bca77c02..3f6cbf0b 100644 --- a/src/goose/brokers/rmq/consumer.clj +++ b/src/goose/brokers/rmq/consumer.clj @@ -1,12 +1,12 @@ (ns ^:no-doc goose.brokers.rmq.consumer (:require - [goose.defaults :as d] - [goose.metrics :as m] - [goose.utils :as u] + [goose.defaults :as d] + [goose.metrics :as m] + [goose.utils :as u] - [clojure.tools.logging :as log] - [langohr.basic :as lb] - [langohr.consumers :as lc])) + [clojure.tools.logging :as log] + [langohr.basic :as lb] + [langohr.consumers :as lc])) (defn wrap-recovery-and-acks [next] @@ -49,12 +49,12 @@ (defn run [{:keys [channels ready-queue] :as opts}] (doall ; Using `doall` to immediately start a consumer. - (for [ch channels] - (do - (lb/qos ch d/rmq-prefetch-limit) ; Set prefetch-limit to 1. - (let [subscriber-opts {:auto-ack false - :handle-cancel-ok #(cancel-ok ready-queue %) - :handle-consume-ok #(consume-ok ready-queue %) - :handle-recover-ok #(recover-ok ready-queue) - :handle-shutdown-signal shutdown-signal}] - [ch (lc/subscribe ch ready-queue #(handler opts %1 %2 %3) subscriber-opts)]))))) + (for [ch channels] + (do + (lb/qos ch d/rmq-prefetch-limit) ; Set prefetch-limit to 1. + (let [subscriber-opts {:auto-ack false + :handle-cancel-ok #(cancel-ok ready-queue %) + :handle-consume-ok #(consume-ok ready-queue %) + :handle-recover-ok #(recover-ok ready-queue) + :handle-shutdown-signal shutdown-signal}] + [ch (lc/subscribe ch ready-queue #(handler opts %1 %2 %3) subscriber-opts)]))))) diff --git a/src/goose/brokers/rmq/publisher_confirms.clj b/src/goose/brokers/rmq/publisher_confirms.clj index d18b5528..cadb7b59 100644 --- a/src/goose/brokers/rmq/publisher_confirms.clj +++ b/src/goose/brokers/rmq/publisher_confirms.clj @@ -1,9 +1,9 @@ (ns goose.brokers.rmq.publisher-confirms (:refer-clojure :exclude [sync]) (:require - [goose.defaults :as d] + [goose.defaults :as d] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn default-ack-handler [ch-number delivery-tag multiple] diff --git a/src/goose/brokers/rmq/queue.clj b/src/goose/brokers/rmq/queue.clj index 99d75979..7698754c 100644 --- a/src/goose/brokers/rmq/queue.clj +++ b/src/goose/brokers/rmq/queue.clj @@ -1,10 +1,10 @@ (ns goose.brokers.rmq.queue (:refer-clojure :exclude [declare]) (:require - [goose.defaults :as d] + [goose.defaults :as d] - [langohr.exchange :as lex] - [langohr.queue :as lq])) + [langohr.exchange :as lex] + [langohr.queue :as lq])) (def classic {:type d/rmq-classic-queue}) diff --git a/src/goose/brokers/rmq/retry.clj b/src/goose/brokers/rmq/retry.clj index c47afd93..04ab698c 100644 --- a/src/goose/brokers/rmq/retry.clj +++ b/src/goose/brokers/rmq/retry.clj @@ -1,10 +1,10 @@ (ns ^:no-doc goose.brokers.rmq.retry (:require - [goose.brokers.rmq.commands :as rmq-cmds] - [goose.defaults :as d] - [goose.retry] - [goose.job :as job] - [goose.utils :as u])) + [goose.brokers.rmq.commands :as rmq-cmds] + [goose.defaults :as d] + [goose.retry] + [goose.job :as job] + [goose.utils :as u])) (defn- retry-job [{:keys [ch queue-type publisher-confirms error-service-config] diff --git a/src/goose/brokers/rmq/return_listener.clj b/src/goose/brokers/rmq/return_listener.clj index 56feb657..b4ec3873 100644 --- a/src/goose/brokers/rmq/return_listener.clj +++ b/src/goose/brokers/rmq/return_listener.clj @@ -1,8 +1,8 @@ (ns goose.brokers.rmq.return-listener (:require - [goose.utils :as u] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn default "Sample handler for unroutable messages." diff --git a/src/goose/brokers/rmq/scheduler.clj b/src/goose/brokers/rmq/scheduler.clj index 0030fc14..8fa637ae 100644 --- a/src/goose/brokers/rmq/scheduler.clj +++ b/src/goose/brokers/rmq/scheduler.clj @@ -1,7 +1,7 @@ (ns ^:no-doc goose.brokers.rmq.scheduler (:require - [goose.brokers.rmq.commands :as rmq-cmds] - [goose.utils :as u])) + [goose.brokers.rmq.commands :as rmq-cmds] + [goose.utils :as u])) (defn run-at [ch queue-opts publisher-confirms schedule-epoch-ms job] diff --git a/src/goose/brokers/rmq/worker.clj b/src/goose/brokers/rmq/worker.clj index 6cf27188..a2d35b0c 100644 --- a/src/goose/brokers/rmq/worker.clj +++ b/src/goose/brokers/rmq/worker.clj @@ -1,18 +1,18 @@ (ns ^:no-doc goose.brokers.rmq.worker (:require - [goose.brokers.rmq.connection :as rmq-connection] - [goose.brokers.rmq.consumer :as rmq-consumer] - [goose.brokers.rmq.queue :as rmq-queue] - [goose.brokers.rmq.retry :as rmq-retry] - [goose.consumer :as consumer] - [goose.defaults :as d] - [goose.job :as job] - [goose.metrics :as m] - [goose.utils :as u] + [goose.brokers.rmq.connection :as rmq-connection] + [goose.brokers.rmq.consumer :as rmq-consumer] + [goose.brokers.rmq.queue :as rmq-queue] + [goose.brokers.rmq.retry :as rmq-retry] + [goose.consumer :as consumer] + [goose.defaults :as d] + [goose.job :as job] + [goose.metrics :as m] + [goose.utils :as u] - [clojure.tools.logging :as log] - [com.climate.claypoole :as cp] - [langohr.basic :as lb])) + [clojure.tools.logging :as log] + [com.climate.claypoole :as cp] + [langohr.basic :as lb])) (defn- await-execution [thread-pool graceful-shutdown-sec] diff --git a/src/goose/client.clj b/src/goose/client.clj index 10208089..37c02b28 100644 --- a/src/goose/client.clj +++ b/src/goose/client.clj @@ -1,14 +1,14 @@ (ns goose.client "Functions for executing job in async, scheduled or periodic manner." (:require - [goose.batch :as batch] - [goose.broker :as b] - [goose.defaults :as d] - [goose.job :as j] - [goose.retry :as retry] - [goose.utils :as u]) + [goose.batch :as batch] + [goose.broker :as b] + [goose.defaults :as d] + [goose.job :as j] + [goose.retry :as retry] + [goose.utils :as u]) (:import - (java.time Instant))) + (java.time Instant))) (def default-opts "Map of sample configs for producing jobs. @@ -30,8 +30,8 @@ (defn- prefix-queues-inside-opts [{:keys [queue retry-opts] :as opts}] (assoc opts - :ready-queue (d/prefix-queue queue) - :retry-opts (retry/prefix-queue-if-present retry-opts))) + :ready-queue (d/prefix-queue queue) + :retry-opts (retry/prefix-queue-if-present retry-opts))) (defn- register-cron-schedule [{:keys [broker queue ready-queue retry-opts] :as _opts} @@ -161,7 +161,7 @@ (perform-every client-opts cron-opts `send-emails \"subject\" \"body\" [:user-1 :user-2])) ``` - - [Cron wiki](https://github.com/nilenso/goose/wiki/Cron)" + - [Cron Jobs wiki](https://github.com/nilenso/goose/wiki/Cron-Jobs)" [opts cron-opts execute-fn-sym & args] (let [internal-opts (prefix-queues-inside-opts opts)] (register-cron-schedule internal-opts cron-opts execute-fn-sym args))) diff --git a/src/goose/console.clj b/src/goose/console.clj index 0fb72b43..0fba728c 100644 --- a/src/goose/console.clj +++ b/src/goose/console.clj @@ -9,8 +9,8 @@ [ring.middleware.params :as ring-params] [ring.util.response :as response]) (:import - (java.lang Character String) - (java.util Date))) + (java.lang Character String) + (java.util Date))) (defn ^:no-doc format-arg [arg] (condp = (type arg) @@ -219,7 +219,7 @@ [:input#intervalSlider {:type "range" :min 2 :max 10 :step 2 :value 2}]] [:button#pollButton.btn.btn-danger "Live poll"] [:button#stopButton.btn.btn-success.stopButton - {:style {:display "none"}}"Stop poll"] + {:style {:display "none"}} "Stop poll"] [:label.toggle-switch [:input {:type "checkbox" :id "isThemeDark"}] [:div.toggle-switch-label diff --git a/src/goose/consumer.clj b/src/goose/consumer.clj index b2c1c28b..e3d6144e 100644 --- a/src/goose/consumer.clj +++ b/src/goose/consumer.clj @@ -1,6 +1,6 @@ (ns ^:no-doc goose.consumer (:require - [goose.utils :as u])) + [goose.utils :as u])) (defn execute-job [_opts {:keys [execute-fn-sym args]}] diff --git a/src/goose/cron/parsing.clj b/src/goose/cron/parsing.clj index 637f9198..8b4ec097 100644 --- a/src/goose/cron/parsing.clj +++ b/src/goose/cron/parsing.clj @@ -1,10 +1,10 @@ (ns ^:no-doc goose.cron.parsing (:import - (com.cronutils.model CronType) - (com.cronutils.model.definition CronDefinitionBuilder) - (com.cronutils.model.time ExecutionTime) - (com.cronutils.parser CronParser) - (java.time ZonedDateTime ZoneId))) + (com.cronutils.model CronType) + (com.cronutils.model.definition CronDefinitionBuilder) + (com.cronutils.model.time ExecutionTime) + (com.cronutils.parser CronParser) + (java.time ZonedDateTime ZoneId))) (defn parse-cron [cron-schedule] @@ -20,7 +20,6 @@ (catch IllegalArgumentException _ false))) - (defn next-run-epoch-ms [cron-schedule timezone] (let [zone (ZoneId/of timezone)] diff --git a/src/goose/defaults.clj b/src/goose/defaults.clj index 60e9dcf4..39b2d397 100644 --- a/src/goose/defaults.clj +++ b/src/goose/defaults.clj @@ -1,7 +1,7 @@ (ns goose.defaults "All default configurations for Goose are defined here." (:require - [clojure.string :as str])) + [clojure.string :as str])) (def worker-threads 5) (def graceful-shutdown-sec 30) diff --git a/src/goose/job.clj b/src/goose/job.clj index 5c29ccc8..004f950a 100644 --- a/src/goose/job.clj +++ b/src/goose/job.clj @@ -1,7 +1,7 @@ (ns ^:no-doc goose.job (:require - [goose.metrics :as m] - [goose.utils :as u])) + [goose.metrics :as m] + [goose.utils :as u])) (defn new [execute-fn-sym args queue ready-queue retry-opts] @@ -39,8 +39,8 @@ (defn from-description [job-description] (assoc job-description - :id (str (random-uuid)) - :enqueued-at (u/epoch-time-ms))) + :id (str (random-uuid)) + :enqueued-at (u/epoch-time-ms))) (defn- calculate-latency [job] diff --git a/src/goose/metrics.clj b/src/goose/metrics.clj index 3bc3314d..f56da3dd 100644 --- a/src/goose/metrics.clj +++ b/src/goose/metrics.clj @@ -2,10 +2,10 @@ "Defines protocol for Metrics Backend. - [Monitoring & Alerting wiki](https://github.com/nilenso/goose/wiki/Monitoring-&-Alerting)" (:require - [goose.defaults :as d] - [goose.utils :as u] + [goose.defaults :as d] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defonce ^:no-doc jobs-processed "jobs.processed") (defonce ^:no-doc jobs-success "jobs.succeeded") @@ -28,7 +28,7 @@ (format "enqueued_jobs.%s.count" (d/affix-queue queue))) (defonce ^:no-doc total-enqueued-jobs-count "total_enqueued_jobs.count") (defonce ^:no-doc schedule-jobs-count "scheduled_jobs.count") -(defonce ^:no-doc cron-count "cron.count") +(defonce ^:no-doc cron-jobs-count "cron_jobs.count") (defonce ^:no-doc dead-jobs-count "dead_jobs.count") (defonce ^:no-doc batches-count "batches.count") diff --git a/src/goose/metrics/statsd.clj b/src/goose/metrics/statsd.clj index 01f7d66b..cdc7a074 100644 --- a/src/goose/metrics/statsd.clj +++ b/src/goose/metrics/statsd.clj @@ -2,10 +2,10 @@ "StatsD is a specimen Metrics Backend for Goose. Plugins can be customized by implementing [[goose.metrics/Metrics]] protocol." (:require - [goose.metrics :as m] - [goose.specs :as specs] + [goose.metrics :as m] + [goose.specs :as specs] - [clj-statsd])) + [clj-statsd])) (defn- build-tags [tags] diff --git a/src/goose/retry.clj b/src/goose/retry.clj index 46cfe4e0..9a080be1 100644 --- a/src/goose/retry.clj +++ b/src/goose/retry.clj @@ -1,9 +1,9 @@ (ns goose.retry (:require - [goose.defaults :as d] - [goose.utils :as u] + [goose.defaults :as d] + [goose.utils :as u] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log])) (defn default-error-handler "Sample error handler of a Job.\\ diff --git a/src/goose/specs.clj b/src/goose/specs.clj index be203763..9aca5098 100644 --- a/src/goose/specs.clj +++ b/src/goose/specs.clj @@ -1,20 +1,20 @@ (ns goose.specs (:require - [goose.broker :as b] - [goose.client :as c] - [goose.cron.parsing :as cron-parsing] - [goose.defaults :as d] - [goose.metrics :as m] - [goose.utils :as u] - [goose.console :as console] - - [clojure.spec.alpha :as s] - [clojure.spec.test.alpha :as st] - [clojure.string :as str] - [taoensso.carmine.connections :refer [IConnectionPool]]) + [goose.broker :as b] + [goose.client :as c] + [goose.cron.parsing :as cron-parsing] + [goose.defaults :as d] + [goose.metrics :as m] + [goose.utils :as u] + [goose.console :as console] + + [clojure.spec.alpha :as s] + [clojure.spec.test.alpha :as st] + [clojure.string :as str] + [taoensso.carmine.connections :refer [IConnectionPool]]) (:import - (java.time Instant ZoneId) - (java.util Arrays))) + (java.time Instant ZoneId) + (java.util Arrays))) ;;; ========== Qualified Function Symbols ============== (s/def ::fn-sym (s/and qualified-symbol? resolve #(fn? @(resolve %)))) @@ -128,19 +128,19 @@ (s/def ::skip-dead-queue boolean?) (s/def ::retry-opts (s/and - (s/map-of #{:max-retries - :retry-delay-sec-fn-sym - :retry-queue - :error-handler-fn-sym - :death-handler-fn-sym - :skip-dead-queue} - any?) - (s/keys :req-un [::max-retries - ::retry-delay-sec-fn-sym - ::error-handler-fn-sym - ::death-handler-fn-sym - ::skip-dead-queue] - :opt-un [::retry-queue]))) + (s/map-of #{:max-retries + :retry-delay-sec-fn-sym + :retry-queue + :error-handler-fn-sym + :death-handler-fn-sym + :skip-dead-queue} + any?) + (s/keys :req-un [::max-retries + ::retry-delay-sec-fn-sym + ::error-handler-fn-sym + ::death-handler-fn-sym + ::skip-dead-queue] + :opt-un [::retry-queue]))) ;;; ============== StatsD Metrics ============== (s/def :goose.specs.statsd/enabled? boolean?) @@ -193,42 +193,42 @@ ;;; ============== FDEFs ============== (s/fdef c/perform-async - :args (s/cat :opts ::client-opts - :execute-fn-sym ::fn-sym - :args (s/* ::args-serializable?)) - :ret map?) + :args (s/cat :opts ::client-opts + :execute-fn-sym ::fn-sym + :args (s/* ::args-serializable?)) + :ret map?) (s/fdef c/perform-at - :args (s/cat :opts ::client-opts - :instant ::instant - :execute-fn-sym ::fn-sym - :args (s/* ::args-serializable?)) - :ret map?) + :args (s/cat :opts ::client-opts + :instant ::instant + :execute-fn-sym ::fn-sym + :args (s/* ::args-serializable?)) + :ret map?) (s/fdef c/perform-in-sec - :args (s/cat :opts ::client-opts - :sec int? - :execute-fn-sym ::fn-sym - :args (s/* ::args-serializable?)) - :ret map?) + :args (s/cat :opts ::client-opts + :sec int? + :execute-fn-sym ::fn-sym + :args (s/* ::args-serializable?)) + :ret map?) (s/fdef c/perform-every - :args (s/cat :opts ::client-opts - :cron-opts ::cron-opts - :execute-fn-sym ::fn-sym - :args (s/* ::args-serializable?)) - :ret map?) + :args (s/cat :opts ::client-opts + :cron-opts ::cron-opts + :execute-fn-sym ::fn-sym + :args (s/* ::args-serializable?)) + :ret map?) (s/fdef c/perform-batch - :args (s/cat :opts ::client-opts - :batch-opts ::batch-opts - :execute-fn-sym ::fn-sym - :args ::batch-args) - :ret map?) + :args (s/cat :opts ::client-opts + :batch-opts ::batch-opts + :execute-fn-sym ::fn-sym + :args ::batch-args) + :ret map?) (s/fdef console/app-handler - :args (s/cat :console-opts ::console-opts - :req map?)) + :args (s/cat :console-opts ::console-opts + :req map?)) (def ^:private fns-with-specs [`c/perform-async @@ -262,16 +262,16 @@ ;;; the purpose of `clojure.repl/doc`. (s/fdef goose.brokers.redis.broker/new-producer - :args (s/cat :redis ::redis-conn-opts) - :ret ::broker) + :args (s/cat :redis ::redis-conn-opts) + :ret ::broker) (defn ^:no-doc assert-redis-producer [conn-opts] (assert-specs 'goose.brokers.redis.broker/new-producer ::redis-conn-opts conn-opts)) (s/fdef goose.brokers.redis.broker/new-consumer - :args (s/alt :one (s/cat :redis ::redis-conn-opts) - :two (s/cat :redis ::redis-conn-opts - :scheduler-polling-interval-sec ::redis-scheduler-polling-interval-sec)) - :ret ::broker) + :args (s/alt :one (s/cat :redis ::redis-conn-opts) + :two (s/cat :redis ::redis-conn-opts + :scheduler-polling-interval-sec ::redis-scheduler-polling-interval-sec)) + :ret ::broker) (defn ^:no-doc assert-redis-consumer [conn-opts scheduler-polling-interval-sec] (assert-specs 'goose.brokers.redis.broker/new-consumer ::redis-conn-opts conn-opts) @@ -280,28 +280,28 @@ scheduler-polling-interval-sec)) (s/fdef goose.brokers.rmq.broker/new-producer - :args (s/alt :one (s/cat :opts ::rmq) - :two (s/cat :opts ::rmq - :channels pos-int?)) - :ret ::broker) + :args (s/alt :one (s/cat :opts ::rmq) + :two (s/cat :opts ::rmq + :channels pos-int?)) + :ret ::broker) (defn ^:no-doc assert-rmq-producer [opts channels] (assert-specs 'goose.brokers.rmq.broker/new-producer ::rmq opts) (assert-specs 'goose.brokers.rmq.broker/new-producer pos-int? channels)) (s/fdef goose.brokers.rmq.broker/new-consumer - :args (s/cat :opts ::rmq) - :ret ::broker) + :args (s/cat :opts ::rmq) + :ret ::broker) (defn ^:no-doc assert-rmq-consumer [opts] (assert-specs 'goose.brokers.rmq.broker/new-consumer ::rmq opts)) (s/fdef goose.metrics.statsd/new - :args (s/cat :opts ::statsd-opts) - :ret ::metrics-plugin) + :args (s/cat :opts ::statsd-opts) + :ret ::metrics-plugin) (defn ^:no-doc assert-statsd [opts] (assert-specs 'goose.metrics.statsd/new ::statsd-opts opts)) (s/fdef goose.worker/start - :args (s/cat :opts ::worker-opts)) + :args (s/cat :opts ::worker-opts)) (defn ^:no-doc assert-worker [opts] (assert-specs 'goose.worker/start ::worker-opts opts)) diff --git a/src/goose/utils.clj b/src/goose/utils.clj index 240ffee1..31dc7513 100644 --- a/src/goose/utils.clj +++ b/src/goose/utils.clj @@ -1,14 +1,14 @@ (ns goose.utils (:refer-clojure :exclude [list]) (:require - [clojure.string :as str] - [clojure.tools.logging :as log] - [com.climate.claypoole :as cp] - [taoensso.nippy :as nippy]) + [clojure.string :as str] + [clojure.tools.logging :as log] + [com.climate.claypoole :as cp] + [taoensso.nippy :as nippy]) (:import - (java.net InetAddress) - (java.time Instant) - (java.lang Math))) + (java.net InetAddress) + (java.time Instant) + (java.lang Math))) (defn encode "Serializes input to a byte array using `taoensso.nippy/freeze`.\\ diff --git a/src/goose/worker.clj b/src/goose/worker.clj index 8f971b57..9b3d5b28 100644 --- a/src/goose/worker.clj +++ b/src/goose/worker.clj @@ -1,8 +1,8 @@ (ns goose.worker (:require - [goose.broker :as b] - [goose.defaults :as d] - [goose.specs :as specs])) + [goose.broker :as b] + [goose.defaults :as d] + [goose.specs :as specs])) (defprotocol Shutdown ;; We're extending a protocol via metadata because reloading REPL diff --git a/test/goose/brokers/redis/api_test.clj b/test/goose/brokers/redis/api_test.clj index aa72e483..e7125ddb 100644 --- a/test/goose/brokers/redis/api_test.clj +++ b/test/goose/brokers/redis/api_test.clj @@ -1,26 +1,26 @@ (ns goose.brokers.redis.api-test (:require - [clojure.test :refer [deftest is testing use-fixtures]] - [goose.api.batch :as batch] - [goose.api.cron-jobs :as cron-jobs] - [goose.api.dead-jobs :as dead-jobs] - [goose.api.enqueued-jobs :as enqueued-jobs] - [goose.api.scheduled-jobs :as scheduled-jobs] - [goose.batch] - [goose.brokers.redis.api.dead-jobs :as redis-dead-jobs] - [goose.brokers.redis.api.enqueued-jobs :as redis-enqueued-jobs] - [goose.brokers.redis.api.scheduled-jobs :as redis-scheduled-jobs] - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.cron :as redis-cron] - [goose.client :as c] - [goose.defaults :as d] - [goose.factories :as f] - [goose.test-utils :as tu] - [goose.utils :as u] - - [goose.worker :as w]) + [clojure.test :refer [deftest is testing use-fixtures]] + [goose.api.batch :as batch] + [goose.api.cron-jobs :as cron-jobs] + [goose.api.dead-jobs :as dead-jobs] + [goose.api.enqueued-jobs :as enqueued-jobs] + [goose.api.scheduled-jobs :as scheduled-jobs] + [goose.batch] + [goose.brokers.redis.api.dead-jobs :as redis-dead-jobs] + [goose.brokers.redis.api.enqueued-jobs :as redis-enqueued-jobs] + [goose.brokers.redis.api.scheduled-jobs :as redis-scheduled-jobs] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.cron :as redis-cron-jobs] + [goose.client :as c] + [goose.defaults :as d] + [goose.factories :as f] + [goose.test-utils :as tu] + [goose.utils :as u] + + [goose.worker :as w]) (:import - (java.time ZoneId))) + (java.time ZoneId))) ;;; ======= Setup & Teardown ========== (use-fixtures :each tu/redis-fixture) @@ -45,14 +45,14 @@ (testing "[redis] enqueued-jobs API over empty list" (let [queues (tu/with-timeout default-timeout-ms - (enqueued-jobs/list-all-queues tu/redis-producer))] + (enqueued-jobs/list-all-queues tu/redis-producer))] (is (and (not= :timed-out queues) (empty? queues)))) (let [jobs (tu/with-timeout default-timeout-ms - (enqueued-jobs/find-by-pattern tu/redis-producer tu/queue (constantly true)))] + (enqueued-jobs/find-by-pattern tu/redis-producer tu/queue (constantly true)))] (is (and (not= :timed-out jobs) (empty? jobs)))) (let [job-id (str (random-uuid))] (is (nil? (tu/with-timeout default-timeout-ms - (enqueued-jobs/find-by-id tu/redis-producer tu/queue job-id))))))) + (enqueued-jobs/find-by-id tu/redis-producer tu/queue job-id))))))) (deftest enqueued-jobs-get-by-range-test (let [[id1 id2 id3] (doall (for [arg [1 2 3]] @@ -122,11 +122,11 @@ (testing "[redis] scheduled-jobs API over empty list" (let [jobs (tu/with-timeout default-timeout-ms - (scheduled-jobs/find-by-pattern tu/redis-producer (constantly true)))] + (scheduled-jobs/find-by-pattern tu/redis-producer (constantly true)))] (is (and (not= :timed-out jobs) (empty? jobs)))) (let [job-id (str (random-uuid))] (is (nil? (tu/with-timeout default-timeout-ms - (scheduled-jobs/find-by-id tu/redis-producer job-id))))))) + (scheduled-jobs/find-by-id tu/redis-producer job-id))))))) (deftest scheduled-jobs-prioritise-test (testing "Should prioritise only one job" @@ -195,10 +195,10 @@ (deftest cron-get-all (testing "Should not get any jobs if no cron exist" - (is (= [] (redis-cron/get-all tu/redis-conn)))) + (is (= [] (redis-cron-jobs/get-all tu/redis-conn)))) (testing "Should get all the crons" (f/create-jobs-in-redis {:cron 1}) - (let [jobs (redis-cron/get-all tu/redis-conn)] + (let [jobs (redis-cron-jobs/get-all tu/redis-conn)] (is (= 1 (count jobs))) (is (every? true? (map #(contains? % :cron-name) jobs)))))) @@ -206,9 +206,9 @@ (testing "Should delete one cron job" (f/create-jobs-in-redis {:cron 1} {:cron {:cron-opts {:cron-name "foo-job"}}}) - (is (= 1 (redis-cron/size tu/redis-conn))) - (is (true? (redis-cron/delete tu/redis-conn "foo-job"))) - (is (= 0 (redis-cron/size tu/redis-conn)))) + (is (= 1 (redis-cron-jobs/size tu/redis-conn))) + (is (true? (redis-cron-jobs/delete tu/redis-conn "foo-job"))) + (is (= 0 (redis-cron-jobs/size tu/redis-conn)))) (tu/clear-redis) (testing "Should delete multiple cron jobs" (f/create-jobs-in-redis {:cron 1} @@ -217,18 +217,18 @@ {:cron {:cron-opts {:cron-name "foo-job-2"}}}) (f/create-jobs-in-redis {:cron 1} {:cron {:cron-opts {:cron-name "foo-job-3"}}}) - (is (= 3 (redis-cron/size tu/redis-conn))) - (is (true? (redis-cron/delete tu/redis-conn "foo-job-1" "foo-job-3"))) - (is (= 1 (redis-cron/size tu/redis-conn)))) + (is (= 3 (redis-cron-jobs/size tu/redis-conn))) + (is (true? (redis-cron-jobs/delete tu/redis-conn "foo-job-1" "foo-job-3"))) + (is (= 1 (redis-cron-jobs/size tu/redis-conn)))) (tu/clear-redis) (testing "Should remove only valid cron jobs" (f/create-jobs-in-redis {:cron 1} {:cron {:cron-opts {:cron-name "barjob1"}}}) (f/create-jobs-in-redis {:cron 1} {:cron {:cron-opts {:cron-name "barjob2"}}}) - (is (= 2 (redis-cron/size tu/redis-conn))) - (is (false? (redis-cron/delete tu/redis-conn "invalid-cron-name" "barjob2"))) - (is (= 1 (redis-cron/size tu/redis-conn))))) + (is (= 2 (redis-cron-jobs/size tu/redis-conn))) + (is (false? (redis-cron-jobs/delete tu/redis-conn "invalid-cron-name" "barjob2"))) + (is (= 1 (redis-cron-jobs/size tu/redis-conn))))) (deftest dead-jobs-delete-test (testing "Should delete a single job and return true" @@ -335,11 +335,11 @@ (testing "[redis] dead-jobs API over empty list" (let [jobs (tu/with-timeout default-timeout-ms - (dead-jobs/find-by-pattern tu/redis-producer (constantly true)))] + (dead-jobs/find-by-pattern tu/redis-producer (constantly true)))] (is (and (not= :timed-out jobs) (empty? jobs)))) (let [job-id (str (random-uuid))] (is (nil? (tu/with-timeout default-timeout-ms - (dead-jobs/find-by-id tu/redis-producer job-id))))))) + (dead-jobs/find-by-id tu/redis-producer job-id))))))) (deftest cron-entries-test (testing "cron entries API" @@ -355,7 +355,6 @@ (is (= "* * * * *" (:cron-schedule recurring-job))) (is (= "US/Pacific" (:timezone recurring-job)))) - (is (= "my-cron-entry" (:cron-name (cron-jobs/find-by-name tu/redis-producer "my-cron-entry")))) (is (= "* * * * *" diff --git a/test/goose/brokers/redis/commands_test.clj b/test/goose/brokers/redis/commands_test.clj index f8ab975c..0576faa2 100644 --- a/test/goose/brokers/redis/commands_test.clj +++ b/test/goose/brokers/redis/commands_test.clj @@ -1,11 +1,11 @@ (ns goose.brokers.redis.commands-test (:require - [goose.brokers.redis.commands :as redis-cmds] - [goose.test-utils :as tu] + [goose.brokers.redis.commands :as redis-cmds] + [goose.test-utils :as tu] - [clojure.string :as str] - [clojure.test :refer [deftest is testing use-fixtures]] - [taoensso.carmine :as car])) + [clojure.string :as str] + [clojure.test :refer [deftest is testing use-fixtures]] + [taoensso.carmine :as car])) (use-fixtures :each tu/redis-fixture) @@ -14,7 +14,7 @@ (let [ks (map #(str "foo" %) (range 5000))] (doseq [k ks] (redis-cmds/wcar* tu/redis-conn - (car/set k 42))) + (car/set k 42))) (is (= (set ks) (set (redis-cmds/scan-seq tu/redis-conn)))))) @@ -23,25 +23,25 @@ bar-keys (map #(str "bar" %) (range 5000))] (doseq [k (concat foo-keys bar-keys)] (redis-cmds/wcar* tu/redis-conn - (car/set k 42))) + (car/set k 42))) (is (= (set foo-keys) (set (redis-cmds/scan-seq tu/redis-conn (fn [conn _ cursor] (redis-cmds/wcar* conn - (car/scan cursor "MATCH" "foo*" "COUNT" 1))))))))) + (car/scan cursor "MATCH" "foo*" "COUNT" 1))))))))) (testing "scanning a data structure at a particular key" (let [set-members (set (map #(str "foo" %) (range 5000)))] (doseq [member set-members] (redis-cmds/wcar* tu/redis-conn - (car/sadd "my-set" member))) + (car/sadd "my-set" member))) (is (= set-members (set (redis-cmds/scan-seq tu/redis-conn (fn [conn redis-key cursor] (redis-cmds/wcar* conn - (car/sscan redis-key cursor "MATCH" "*" "COUNT" 1))) + (car/sscan redis-key cursor "MATCH" "*" "COUNT" 1))) "my-set"))))))) (deftest find-in-set-test @@ -50,7 +50,7 @@ bar-members (map #(str "bar" %) (range 1000))] (doseq [member (concat foo-members bar-members)] (redis-cmds/wcar* tu/redis-conn - (car/sadd "my-set" member))) + (car/sadd "my-set" member))) (is (= (set foo-members) (set (redis-cmds/find-in-set tu/redis-conn @@ -105,7 +105,7 @@ (redis-cmds/del-from-list-and-enqueue-front tu/redis-conn "my-list" "foo2" "foo0"))) (is (= ["foo0" "foo2" "foo1" "foo3" "foo4"] (redis-cmds/range-from-front tu/redis-conn "my-list" 0 4))) - (is (= [[["OK" "QUEUED" "QUEUED"] [1 5]]](redis-cmds/del-from-list-and-enqueue-front tu/redis-conn "my-list" "foo3"))) + (is (= [[["OK" "QUEUED" "QUEUED"] [1 5]]] (redis-cmds/del-from-list-and-enqueue-front tu/redis-conn "my-list" "foo3"))) (is (= ["foo3" "foo0" "foo2" "foo1" "foo4"] (redis-cmds/range-from-front tu/redis-conn "my-list" 0 4))))) (deftest find-in-list-test @@ -164,7 +164,7 @@ bar-members (set (map #(str "bar" %) (range 1000)))] (doseq [member (concat foo-members bar-members)] (redis-cmds/wcar* tu/redis-conn - (car/zadd "my-zset" (rand-int 100) member))) + (car/zadd "my-zset" (rand-int 100) member))) (is (= bar-members (set (redis-cmds/find-in-sorted-set tu/redis-conn @@ -176,7 +176,7 @@ (let [foo-members (set (map #(str "foo" %) (range 1000)))] (doseq [member foo-members] (redis-cmds/wcar* tu/redis-conn - (car/zadd "my-other-zset" (rand-int 100) member))) + (car/zadd "my-other-zset" (rand-int 100) member))) (is (= 500 (count (redis-cmds/find-in-sorted-set tu/redis-conn diff --git a/test/goose/brokers/redis/console/page_test.clj b/test/goose/brokers/redis/console/page_test.clj index a437fe2c..2e967efe 100644 --- a/test/goose/brokers/redis/console/page_test.clj +++ b/test/goose/brokers/redis/console/page_test.clj @@ -156,24 +156,24 @@ (is (nil? (:queue (specs/validate-req-params {:queue :not-string})))) (is (= "some-encoded-job" (:encoded-job (specs/validate-req-params - {:job "some-encoded-job"})))) + {:job "some-encoded-job"})))) (is (nil? (:encoded-job (specs/validate-req-params - {:job {:id "123"}})))) + {:job {:id "123"}})))) (is (= ["some-encoded-job"] (:encoded-jobs (specs/validate-req-params - {:jobs "some-encoded-job"})))) + {:jobs "some-encoded-job"})))) (is (= ["some-encoded-job1" "some-encoded-job2"] (:encoded-jobs (specs/validate-req-params - {:jobs ["some-encoded-job1" - "some-encoded-job2"]})))) + {:jobs ["some-encoded-job1" + "some-encoded-job2"]})))) (is (= "some-string-value" (:cron-name (specs/validate-req-params - {:cron-name "some-string-value"})))) + {:cron-name "some-string-value"})))) (is (= nil (:cron-name (specs/validate-req-params {:cron-name :non-string-val})))) (is (= ["string1" "string2"] (:cron-names (specs/validate-req-params - {:cron-names ["string1" "string2"]})))) + {:cron-names ["string1" "string2"]})))) (is (= nil (:cron-names (specs/validate-req-params - {:cron-names [1 2 :non-string "string2"]})))))) + {:cron-names [1 2 :non-string "string2"]})))))) (deftest page-handler-test (testing "Main handler should invoke home-page handler" @@ -213,21 +213,21 @@ (with-redefs [enqueued/get-job (spy/stub {:status 200 :body " Enqueue job UI "})] (console/handler tu/redis-producer (mock/request - :get (str "/enqueued/queue/default/job/" (random-uuid)))) + :get (str "/enqueued/queue/default/job/" (random-uuid)))) (is (true? (spy/called-once? enqueued/get-job))))) (testing "Main handler should invoke prioritise job handler for enqueued jobs page" (with-redefs [enqueued/prioritise-job (spy/stub {:status 302 :body "" :headers {"Location" "/enqueued/queue/test"}})] (console/handler tu/redis-producer (mock/request - :post (str "/enqueued/queue/default/job/" (random-uuid)))) + :post (str "/enqueued/queue/default/job/" (random-uuid)))) (is (true? (spy/called-once? enqueued/prioritise-job))))) (testing "Main handler should invoke delete job handler for enqueued jobs page" (with-redefs [enqueued/delete-job (spy/stub {:status 302 :body "" :headers {"Location" "/enqueued/queue/test"}})] (console/handler tu/redis-producer (mock/request - :delete (str "/enqueued/queue/default/job/" (random-uuid)))) + :delete (str "/enqueued/queue/default/job/" (random-uuid)))) (is (true? (spy/called-once? enqueued/delete-job))))) (testing "Main handler should invoke get-jobs handler for dead jobs page" diff --git a/test/goose/brokers/redis/cron/registry_test.clj b/test/goose/brokers/redis/cron/registry_test.clj index 5a5ced1e..cc14bee8 100644 --- a/test/goose/brokers/redis/cron/registry_test.clj +++ b/test/goose/brokers/redis/cron/registry_test.clj @@ -1,32 +1,32 @@ (ns goose.brokers.redis.cron.registry-test (:require - [goose.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.redis.cron :as cron] - [goose.cron.parsing :as cron-parsing] - [goose.defaults :as d] - [goose.job :as j] - [goose.retry :as retry] - [goose.test-utils :as tu] - [goose.utils :as u] + [goose.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.redis.cron :as cron] + [goose.cron.parsing :as cron-parsing] + [goose.defaults :as d] + [goose.job :as j] + [goose.retry :as retry] + [goose.test-utils :as tu] + [goose.utils :as u] - [clojure.test :refer [deftest is testing use-fixtures]]) + [clojure.test :refer [deftest is testing use-fixtures]]) (:import - (java.time ZoneId))) + (java.time ZoneId))) (use-fixtures :each tu/redis-fixture) (defn- after-due-time [cron-schedule timezone] (inc - (cron-parsing/next-run-epoch-ms - cron-schedule timezone))) + (cron-parsing/next-run-epoch-ms + cron-schedule timezone))) (defn- before-due-time [cron-schedule timezone] (dec - (cron-parsing/next-run-epoch-ms - cron-schedule timezone))) + (cron-parsing/next-run-epoch-ms + cron-schedule timezone))) (deftest cron-registration-test (testing "Registering two cron entries with the same name" @@ -72,7 +72,7 @@ retry/default-opts))] (with-redefs [u/epoch-time-ms (constantly - (after-due-time cron-schedule timezone))] + (after-due-time cron-schedule timezone))] (is (= [{:cron-name "my-cron-name" :cron-schedule cron-schedule :timezone timezone @@ -85,7 +85,7 @@ "The cron entry is due after the scheduled cron time")) (with-redefs [u/epoch-time-ms (constantly - (before-due-time cron-schedule timezone))] + (before-due-time cron-schedule timezone))] (is (empty? (cron/due-cron-entries tu/redis-conn)) "The cron entry is not due before the scheduled cron time"))))) @@ -105,17 +105,17 @@ (d/prefix-queue "foo-queue") retry/default-opts)) (with-redefs [u/epoch-time-ms (constantly - (after-due-time cron-schedule timezone)) + (after-due-time cron-schedule timezone)) cron-parsing/next-run-epoch-ms (constantly - (inc - (after-due-time cron-schedule timezone)))] + (inc + (after-due-time cron-schedule timezone)))] (is (cron/enqueue-due-cron-entries tu/redis-conn) "find-and-enqueue-cron-entries returns truthy if due cron entries were found")) (is (empty? - (with-redefs [u/epoch-time-ms (constantly - (after-due-time cron-schedule timezone))] - (cron/due-cron-entries tu/redis-conn))) + (with-redefs [u/epoch-time-ms (constantly + (after-due-time cron-schedule timezone))] + (cron/due-cron-entries tu/redis-conn))) "The cron entry is not immediately due after enqueueing") (is (= {:args [:a "b" 3] diff --git a/test/goose/brokers/redis/integration_test.clj b/test/goose/brokers/redis/integration_test.clj index 9b395cbf..59053267 100644 --- a/test/goose/brokers/redis/integration_test.clj +++ b/test/goose/brokers/redis/integration_test.clj @@ -1,23 +1,23 @@ (ns goose.brokers.redis.integration-test (:require - [goose.batch :as batch] - [goose.brokers.redis.batch :as redis-batch] - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.redis.consumer :as redis-consumer] - [goose.client :as c] - [goose.defaults :as d] - [goose.job :as j] - [goose.retry :as retry] - [goose.test-utils :as tu] - [goose.utils :as u] - [goose.worker :as w] + [goose.batch :as batch] + [goose.brokers.redis.batch :as redis-batch] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.redis.consumer :as redis-consumer] + [goose.client :as c] + [goose.defaults :as d] + [goose.job :as j] + [goose.retry :as retry] + [goose.test-utils :as tu] + [goose.utils :as u] + [goose.worker :as w] - [clojure.test :refer [deftest is testing use-fixtures]] - [taoensso.carmine :as car]) + [clojure.test :refer [deftest is testing use-fixtures]] + [taoensso.carmine :as car]) (:import - [clojure.lang ExceptionInfo] - [java.time Instant] - [java.util UUID])) + [clojure.lang ExceptionInfo] + [java.time Instant] + [java.util UUID])) ;;; ======= Setup & Teardown ========== (use-fixtures :each tu/redis-fixture) @@ -98,7 +98,7 @@ (testing "[redis] Goose calls middleware" (reset! middleware-called (promise)) (let [worker (w/start (assoc tu/redis-worker-opts - :middlewares test-middleware)) + :middlewares test-middleware)) _ (c/perform-async tu/redis-client-opts `add-five 5)] (is (= 10 (deref @middleware-called 100 :middleware-test-timed-out))) (w/stop worker)))) @@ -135,10 +135,10 @@ (let [arg "retry-test" retry-opts (assoc retry/default-opts - :max-retries 2 - :retry-delay-sec-fn-sym `immediate-retry - :retry-queue retry-queue - :error-handler-fn-sym `retry-test-error-handler) + :max-retries 2 + :retry-delay-sec-fn-sym `immediate-retry + :retry-queue retry-queue + :error-handler-fn-sym `retry-test-error-handler) _ (c/perform-async (assoc tu/redis-client-opts :retry-opts retry-opts) `erroneous-fn arg) error-svc-cfg :my-retry-test-config worker-opts (assoc tu/redis-worker-opts :error-service-config error-svc-cfg) @@ -174,10 +174,10 @@ (reset! death-error-service (promise)) (reset! dead-job-run-count 0) (let [dead-job-opts (assoc retry/default-opts - :max-retries 1 - :retry-delay-sec-fn-sym `immediate-retry - :error-handler-fn-sym `dead-test-error-handler - :death-handler-fn-sym `dead-test-death-handler) + :max-retries 1 + :retry-delay-sec-fn-sym `immediate-retry + :error-handler-fn-sym `dead-test-error-handler + :death-handler-fn-sym `dead-test-death-handler) _ (c/perform-async (assoc tu/redis-client-opts :retry-opts dead-job-opts) `dead-fn :foo) error-svc-cfg :my-death-test-config worker-opts (assoc tu/redis-worker-opts :error-service-config error-svc-cfg) diff --git a/test/goose/brokers/rmq/api_test.clj b/test/goose/brokers/rmq/api_test.clj index 1ef13600..751d49c5 100644 --- a/test/goose/brokers/rmq/api_test.clj +++ b/test/goose/brokers/rmq/api_test.clj @@ -1,15 +1,15 @@ (ns goose.brokers.rmq.api-test (:require - [goose.api.dead-jobs :as dead-jobs] - [goose.api.enqueued-jobs :as enqueued-jobs] - [goose.client :as c] - [goose.retry :as retry] - [goose.test-utils :as tu] - [goose.worker :as w] + [goose.api.dead-jobs :as dead-jobs] + [goose.api.enqueued-jobs :as enqueued-jobs] + [goose.client :as c] + [goose.retry :as retry] + [goose.test-utils :as tu] + [goose.worker :as w] - [clojure.test :refer [deftest is testing use-fixtures]]) + [clojure.test :refer [deftest is testing use-fixtures]]) (:import - [java.util UUID])) + [java.util UUID])) ;;; ======= Setup & Teardown ========== (use-fixtures :each tu/rmq-fixture) @@ -31,8 +31,8 @@ (testing "[rmq] dead-jobs API" (let [worker (w/start (assoc tu/rmq-worker-opts :threads 1)) retry-opts (assoc retry/default-opts - :max-retries 0 - :death-handler-fn-sym `death-handler) + :max-retries 0 + :death-handler-fn-sym `death-handler) job-opts (assoc tu/rmq-client-opts :retry-opts retry-opts) _ (doseq [id (range 4)] (c/perform-async job-opts `dead-fn id) diff --git a/test/goose/brokers/rmq/console_test.clj b/test/goose/brokers/rmq/console_test.clj index ca1e295f..f7266db1 100644 --- a/test/goose/brokers/rmq/console_test.clj +++ b/test/goose/brokers/rmq/console_test.clj @@ -1,15 +1,15 @@ (ns goose.brokers.rmq.console-test (:require - [clojure.string :as str] - [clojure.test :refer [deftest is testing use-fixtures]] - [goose.brokers.rmq.api.dead-jobs :as dead-jobs] - [goose.brokers.rmq.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.rmq.console :as console] - [goose.factories :as f] - [goose.test-utils :as tu] - [goose.utils :as u] - [ring.mock.request :as mock] - [spy.core :as spy])) + [clojure.string :as str] + [clojure.test :refer [deftest is testing use-fixtures]] + [goose.brokers.rmq.api.dead-jobs :as dead-jobs] + [goose.brokers.rmq.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.rmq.console :as console] + [goose.factories :as f] + [goose.test-utils :as tu] + [goose.utils :as u] + [ring.mock.request :as mock] + [spy.core :as spy])) (use-fixtures :each tu/rmq-fixture) diff --git a/test/goose/brokers/rmq/integration_test.clj b/test/goose/brokers/rmq/integration_test.clj index 00dc3ed5..9a5ea96c 100644 --- a/test/goose/brokers/rmq/integration_test.clj +++ b/test/goose/brokers/rmq/integration_test.clj @@ -1,21 +1,20 @@ (ns goose.brokers.rmq.integration-test (:require - [goose.api.enqueued-jobs :as enqueued-jobs] - [goose.brokers.rmq.broker :as rmq] - [goose.brokers.rmq.queue :as rmq-queue] - [goose.client :as c] - [goose.defaults :as d] - [goose.retry :as retry] - [goose.test-utils :as tu] - [goose.worker :as w] - - [clojure.test :refer [deftest is testing use-fixtures]]) + [goose.api.enqueued-jobs :as enqueued-jobs] + [goose.brokers.rmq.broker :as rmq] + [goose.brokers.rmq.queue :as rmq-queue] + [goose.client :as c] + [goose.defaults :as d] + [goose.retry :as retry] + [goose.test-utils :as tu] + [goose.worker :as w] + + [clojure.test :refer [deftest is testing use-fixtures]]) (:import - (clojure.lang ExceptionInfo) - [java.util UUID] - (java.util.concurrent TimeoutException) - (java.time Instant))) - + (clojure.lang ExceptionInfo) + [java.util UUID] + (java.util.concurrent TimeoutException) + (java.time Instant))) ;;; ======= Setup & Teardown ========== (use-fixtures :each tu/rmq-fixture) @@ -55,8 +54,8 @@ consumer (rmq/new-consumer opts) worker-opts (assoc tu/worker-opts - :broker consumer - :queue queue) + :broker consumer + :queue queue) _ (is (uuid? (UUID/fromString (:id (c/perform-async client-opts `quorum-fn arg))))) worker (w/start worker-opts)] @@ -95,10 +94,10 @@ (testing "[rmq] Scheduling beyond max_delay limit" (is - (thrown-with-msg? - ExceptionInfo - #"MAX_DELAY limit breached*" - (c/perform-in-sec tu/rmq-client-opts 4294968 `perform-in-sec-fn))))) + (thrown-with-msg? + ExceptionInfo + #"MAX_DELAY limit breached*" + (c/perform-in-sec tu/rmq-client-opts 4294968 `perform-in-sec-fn))))) ;;; ======= TEST: Publisher Confirms ======= (def ack-channel-number (atom (promise))) @@ -113,24 +112,24 @@ ;; Remove this test if it happens often. (testing "[rmq][sync-confirms] Publish timed out" (let [opts (assoc tu/rmq-opts - :publisher-confirms {:strategy d/sync-confirms :timeout-ms 1}) + :publisher-confirms {:strategy d/sync-confirms :timeout-ms 1}) producer (rmq/new-producer opts 1) client-opts {:queue "sync-publisher-confirms-test" :retry-opts retry/default-opts :broker producer}] (is - (thrown? - TimeoutException - (c/perform-async client-opts `tu/my-fn))) + (thrown? + TimeoutException + (c/perform-async client-opts `tu/my-fn))) (rmq/close producer))) (testing "[rmq][async-confirms] Ack handler called" (reset! ack-channel-number (promise)) (reset! ack-delivery-tag (promise)) (let [opts (assoc tu/rmq-opts - :publisher-confirms {:strategy d/async-confirms - :ack-handler test-ack-handler - :nack-handler test-nack-handler}) + :publisher-confirms {:strategy d/async-confirms + :ack-handler test-ack-handler + :nack-handler test-nack-handler}) ;; Create multiple channels to test correctness of delivery-tag & channel-number. producer (rmq/new-producer opts 5) client-opts {:queue "async-publisher-confirms-test" @@ -173,7 +172,7 @@ (testing "[rmq] Goose calls middleware & attaches RMQ metadata to opts" (reset! middleware-called (promise)) (let [worker (w/start (assoc tu/rmq-worker-opts - :middlewares test-middleware)) + :middlewares test-middleware)) _ (c/perform-async tu/rmq-client-opts `tu/my-fn :arg1)] (is (= d/content-type (:content-type (deref @middleware-called 100 :middleware-test-timed-out)))) (w/stop worker)))) @@ -209,10 +208,10 @@ (let [arg "retry-test" retry-opts (assoc retry/default-opts - :max-retries 2 - :retry-delay-sec-fn-sym `immediate-retry - :retry-queue retry-queue - :error-handler-fn-sym `retry-test-error-handler) + :max-retries 2 + :retry-delay-sec-fn-sym `immediate-retry + :retry-queue retry-queue + :error-handler-fn-sym `retry-test-error-handler) _ (c/perform-async (assoc tu/rmq-client-opts :retry-opts retry-opts) `erroneous-fn arg) error-svc-cfg :my-retry-test-config worker-opts (assoc tu/rmq-worker-opts :error-service-config error-svc-cfg) @@ -249,10 +248,10 @@ (reset! death-error-service (promise)) (reset! dead-job-run-count 0) (let [dead-job-opts (assoc retry/default-opts - :max-retries 1 - :retry-delay-sec-fn-sym `immediate-retry - :error-handler-fn-sym `dead-test-error-handler - :death-handler-fn-sym `dead-test-death-handler) + :max-retries 1 + :retry-delay-sec-fn-sym `immediate-retry + :error-handler-fn-sym `dead-test-error-handler + :death-handler-fn-sym `dead-test-death-handler) _ (c/perform-async (assoc tu/rmq-client-opts :retry-opts dead-job-opts) `dead-fn) error-svc-cfg :my-death-test-config worker-opts (assoc tu/rmq-worker-opts :error-service-config error-svc-cfg) diff --git a/test/goose/console_test.clj b/test/goose/console_test.clj index 8f43afb5..95d1c79f 100644 --- a/test/goose/console_test.clj +++ b/test/goose/console_test.clj @@ -55,10 +55,10 @@ (deftest handler-test (let [req-with-client-opts (assoc (mock/request :get "foo/") - :console-opts {:broker tu/redis-producer - :app-name "" - :route-prefix "foo"} - :prefix-route (partial str "foo"))] + :console-opts {:broker tu/redis-producer + :app-name "" + :route-prefix "foo"} + :prefix-route (partial str "foo"))] (testing "Should call redis-handler given redis-broker to broker/handler" (with-redefs [redis-console/handler (spy/spy redis-console/handler)] (is (true? (spy/not-called? redis-console/handler))) @@ -68,8 +68,8 @@ (with-redefs [rmq-console/handler (spy/spy rmq-console/handler)] (is (true? (spy/not-called? rmq-console/handler))) (broker/handler tu/rmq-producer (assoc-in req-with-client-opts - [:console-opts - :broker] tu/rmq-producer)) + [:console-opts + :broker] tu/rmq-producer)) (is (true? (spy/called? rmq-console/handler))))) (testing "Should throw IllegalArgumentException given invalid broker" (is (thrown? IllegalArgumentException (broker/handler {:broker :invalid-broker} req-with-client-opts)))))) diff --git a/test/goose/factories.clj b/test/goose/factories.clj index d9571461..bcee5333 100644 --- a/test/goose/factories.clj +++ b/test/goose/factories.clj @@ -51,7 +51,7 @@ (:id j))) (defn create-jobs-in-redis [{:keys [enqueued scheduled cron dead] - :or {enqueued 0 scheduled 0 cron 0 dead 0}} & [overrides]] + :or {enqueued 0 scheduled 0 cron 0 dead 0}} & [overrides]] (let [apply-fn-n-times (fn [n f & args] (dotimes [_ n] (apply f args)))] (apply-fn-n-times enqueued create-async-job-in-redis (:enqueued overrides)) diff --git a/test/goose/specs_test.clj b/test/goose/specs_test.clj index 3a9692e9..26a3f46b 100644 --- a/test/goose/specs_test.clj +++ b/test/goose/specs_test.clj @@ -1,24 +1,24 @@ (ns goose.specs-test (:require - [clojure.spec.alpha :as s] - [clojure.test :refer [are deftest is]] - [goose.batch :as batch] - [goose.brokers.redis.broker :as redis] - [goose.brokers.rmq.broker :as rmq] - [goose.client :as c] - [goose.console :as console] - [goose.defaults :as d] - [goose.metrics.statsd :as statsd] - [goose.specs :as specs] - [goose.test-utils :as tu] - [goose.utils :as u] - - [goose.worker :as w]) + [clojure.spec.alpha :as s] + [clojure.test :refer [are deftest is]] + [goose.batch :as batch] + [goose.brokers.redis.broker :as redis] + [goose.brokers.rmq.broker :as rmq] + [goose.client :as c] + [goose.console :as console] + [goose.defaults :as d] + [goose.metrics.statsd :as statsd] + [goose.specs :as specs] + [goose.test-utils :as tu] + [goose.utils :as u] + + [goose.worker :as w]) (:import - (clojure.lang ExceptionInfo) - (java.time Instant) - (java.util HashMap) - (tech.v3.datatype FastStruct))) + (clojure.lang ExceptionInfo) + (java.time Instant) + (java.util HashMap) + (tech.v3.datatype FastStruct))) (defn single-arity-fn [_] "dummy") (def now (Instant/now)) @@ -41,12 +41,12 @@ (deftest specs-test (specs/instrument) (are [sut] - (is + (is ;; When specs are instrumented, expect exceptions for incorrect parameters. - (thrown-with-msg? - ExceptionInfo - #"Call to goose.* did not conform to spec." - (sut))) + (thrown-with-msg? + ExceptionInfo + #"Call to goose.* did not conform to spec." + (sut))) ;; Client specs ;; :execute-fn-sym diff --git a/test/goose/test_utils.clj b/test/goose/test_utils.clj index 8d999ea7..b1c7045d 100644 --- a/test/goose/test_utils.clj +++ b/test/goose/test_utils.clj @@ -1,28 +1,28 @@ (ns goose.test-utils (:require - [goose.brokers.redis.broker :as redis] - [goose.brokers.redis.commands :as redis-cmds] - [goose.brokers.rmq.broker :as rmq] - [goose.brokers.rmq.publisher-confirms :as rmq-publisher-confirms] - [goose.brokers.rmq.queue :as rmq-queue] - [goose.brokers.rmq.return-listener :as return-listener] - [goose.brokers.rmq.shutdown-listener :as shutdown-listener] - [goose.defaults :as d] - [goose.metrics.statsd :as statsd] - [goose.retry :as retry] - [goose.specs :as specs] - [goose.utils :as u] - - [langohr.queue :as lq] - [taoensso.carmine :as car])) + [goose.brokers.redis.broker :as redis] + [goose.brokers.redis.commands :as redis-cmds] + [goose.brokers.rmq.broker :as rmq] + [goose.brokers.rmq.publisher-confirms :as rmq-publisher-confirms] + [goose.brokers.rmq.queue :as rmq-queue] + [goose.brokers.rmq.return-listener :as return-listener] + [goose.brokers.rmq.shutdown-listener :as shutdown-listener] + [goose.defaults :as d] + [goose.metrics.statsd :as statsd] + [goose.retry :as retry] + [goose.specs :as specs] + [goose.utils :as u] + + [langohr.queue :as lq] + [taoensso.carmine :as car])) (defn my-fn [arg & _] arg) (def queue "test") (defn no-op-error-handler [_ _ _]) (def retry-opts (assoc retry/default-opts - :error-handler-fn-sym `no-op-error-handler - :death-handler-fn-sym `no-op-error-handler)) + :error-handler-fn-sym `no-op-error-handler + :death-handler-fn-sym `no-op-error-handler)) (def client-opts {:queue queue :retry-opts retry-opts}) @@ -59,7 +59,6 @@ :app-name "" :route-prefix ""}) - ;; RMQ --------- (def rmq-url (let [host (or (System/getenv "GOOSE_TEST_RABBITMQ_HOST") "localhost") @@ -80,7 +79,7 @@ (def rmq-console-opts {:broker rmq-producer :app-name "" - :route-prefix "" }) + :route-prefix ""}) (defn rmq-delete-test-queues [] diff --git a/test/goose/worker_test.clj b/test/goose/worker_test.clj index d3842fc4..e36ba392 100644 --- a/test/goose/worker_test.clj +++ b/test/goose/worker_test.clj @@ -1,10 +1,10 @@ (ns goose.worker-test (:require - [goose.client :as c] - [goose.test-utils :as tu] - [goose.worker :as w] + [goose.client :as c] + [goose.test-utils :as tu] + [goose.worker :as w] - [clojure.test :refer [deftest is testing use-fixtures]])) + [clojure.test :refer [deftest is testing use-fixtures]])) ;;; ======= Setup & Teardown ========== (use-fixtures :each tu/redis-fixture) diff --git a/test/test_runner.clj b/test/test_runner.clj index 4ab035bc..7a06d9db 100644 --- a/test/test_runner.clj +++ b/test/test_runner.clj @@ -1,8 +1,8 @@ (ns test-runner (:require - [goose.test-utils :as tu] + [goose.test-utils :as tu] - [cognitect.test-runner.api :as test-runner])) + [cognitect.test-runner.api :as test-runner])) (defn test-and-shutdown [args]