Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from FarmLogs/reliability-testing
Browse files Browse the repository at this point in the history
Reliability testing
  • Loading branch information
briprowe committed May 27, 2016
2 parents d6058d4 + c8a2a46 commit 9e9d369
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.farmlogs.conduit "0.1.0"
(defproject com.farmlogs.conduit "0.1.1"
:description "Provides reliable publishing via RMQ."
:license {:name "The MIT License (MIT)"
:url "https://opensource.org/licenses/MIT"}
Expand Down
5 changes: 3 additions & 2 deletions src/com/farmlogs/conduit/subscription.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@
(log/infof "Consumer tag: %s on queue: %s shutdown" consumer-tag queue-name))

(defn- make-channel
[conn {:keys [exchange-name exchange-type] :as config}]
[conn prefetch-count {:keys [exchange-name exchange-type] :as config}]
(let [chan (rmq.chan/open conn)]
(rmq.basic/qos chan prefetch-count)
(rmq.exch/declare chan exchange-name exchange-type config)
(rmq.queue/declare chan (:queue-name config) config)
(rmq.queue/bind chan (:queue-name config) exchange-name config)
Expand All @@ -60,7 +61,7 @@
component/Lifecycle
(start [this]
(log/infof "Starting subscription on queue '%s'" (:queue-name queue-config))
(let [rmq-chan (or rmq-chan (make-channel (:conn rmq-connection) queue-config))
(let [rmq-chan (or rmq-chan (make-channel (:conn rmq-connection) buffer-size queue-config))
new-messages (a/chan buffer-size)
pending-messages (a/chan buffer-size)
ack-process (->ack-process new-messages buffer-size rmq-chan)
Expand Down
6 changes: 4 additions & 2 deletions src/com/farmlogs/conduit/subscription/ack_process.clj
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@

;; A result arrived
:else
(do (a/>! io-chan [event (get pending chan)])
(recur (dissoc pending chan)))))
(do
(log/trace "acking:" (get-in pending [chan :delivery-tag]))
(a/>! io-chan [event (get pending chan)])
(recur (dissoc pending chan)))))
(do
;; Shutdown responder thread.
(a/close! io-chan)
Expand Down
2 changes: 1 addition & 1 deletion test/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

<!-- For loggers in the these namespaces, log at all levels. -->
<logger name="user" level="ALL" />
<logger name="com.farmlogs" level="ALL" />
<logger name="com.farmlogs" level="DEBUG" />

</configuration>

0 comments on commit 9e9d369

Please sign in to comment.