Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
fix producer consumer test
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 27, 2018
1 parent 7ea8562 commit daa4d4d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 47 deletions.
8 changes: 4 additions & 4 deletions tests/kafunk.Tests/AsyncTest.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ let chooseTaskOrAsync2 (t:Task<'a>) (a:Async<'a>) : Async<'a> = async {
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled () |> ignore)
IVar.intoCancellationToken cts tcs
//let opts = TaskContinuationOptions.DenyChildAttach
let t = Task.Factory.ContinueWhenAny([|t|], (fun (t:Task<_>) -> tcs.TrySetResult t.Result |> ignore), cts.Token)
//t.ContinueWith((fun (t:Task<'a>) -> tcs.TrySetResult t.Result |> ignore ; cts.Cancel ()), opts) |> ignore
//let t = Task.Factory.ContinueWhenAny([|t|], (fun (t:Task<_>) -> tcs.TrySetResult t.Result |> ignore), cts.Token)
let _t = t.ContinueWith((fun (t:Task<'a>) -> tcs.TrySetResult t.Result |> ignore ; cts.Cancel ()), cts.Token)
let a = async {
try
let! a = a
Expand Down Expand Up @@ -78,7 +78,7 @@ let cancelWithToken (ct:CancellationToken) (a:Async<'a>) : Async<'a option> = as
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a option>()
use _reg = cts.Token.Register (fun () -> tcs.TrySetResult None |> ignore)
let _reg = cts.Token.Register (fun () -> tcs.TrySetResult None |> ignore)
let a = async {
try
let! a = a
Expand Down Expand Up @@ -123,4 +123,4 @@ let go3 = async {
|> Async.parallelThrottledIgnore 1000
}

Async.RunSynchronously go3
Async.RunSynchronously go2
90 changes: 47 additions & 43 deletions tests/kafunk.Tests/ProducerConsumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,30 @@ let messageKeyBytes = messageKey |> System.Text.Encoding.UTF8.GetBytes |> Binary

let chanConfig =
ChanConfig.create (
requestTimeout = TimeSpan.FromSeconds 10.0,
connectRetryPolicy = RetryPolicy.none)
requestTimeout = TimeSpan.FromSeconds 10.0)

let consuming = new CountdownEvent(consumerCount)
let completed = IVar.create ()
let sw = System.Diagnostics.Stopwatch.StartNew()


//type Value = int

//type State =
// | Produced of DateTime
// | Received of DateTime * DateTime

type Ack () =

/// maps values to their state (prepared | ack'd)
let pending = SortedList<int, int>(Comparer.Default)
let mutable contig = -1
let mutable duplicates = 0
let mutable received = 0
let mutable sent = 0

/// Find the longest contiguous sequence starting at the specified index.
/// Returns the largest contiguous value.
let rec findContig i prev =
if i = pending.Count then prev else
let v = pending.Keys.[i]
Expand All @@ -53,6 +61,7 @@ type Ack () =
else
prev

/// Clears the contiguous sequence up to the specified value.
let pruneUpTo v =
let i = pending.IndexOfKey v
if i > 0 then
Expand All @@ -65,25 +74,32 @@ type Ack () =
member __.Received = received
member __.Pending = pending.Count

/// Marks a sequence of items as pending and awaiting acknowledgement.
member __.Prepare (vs:int seq) =
for v in vs do
pending.Add (v, 0)
sent <- sent + 1

member __.Ack (vs:int seq) =
/// Marks a set of items as acknowledged, keeping track of duplicates.
/// Also maintains a contiguous sequence of items to verify ordering.
member __.Ack (vs:int seq) =
for v in vs do
//if v <= contig then
// duplicates <- duplicates + 1
//else
let mutable s = Unchecked.defaultof<_>
if (pending.TryGetValue (v, &s)) then
if s = 1 then
duplicates <- duplicates + 1
else
pending.[v] <- 1
received <- received + 1
let mutable s = Unchecked.defaultof<_>
if (pending.TryGetValue (v, &s)) then
if s = 1 then
// already ack'd
duplicates <- duplicates + 1
else
failwithf "invalid state=%i" v
// first time ack
pending.[v] <- 1
received <- received + 1
else
let first,last =
if pending.Count > 0 then pending.Keys.[0], pending.Keys.[pending.Keys.Count - 1]
else -1,-1
if first < v then
failwithf "ack v=%i is not in pending list and greater than prune watermark first=%i last=%i" v first last

let wm = contig
let wm' = findContig 0 wm
if wm' > wm then
Expand All @@ -97,22 +113,19 @@ type Report =
val received : int
val duplicates : int
val produced : int
val skipped : int
val contigCount : int
val lastContigOffset : (Partition * Offset) option
val offsets : Map<Partition, Offset>
new (r,d,p,s,cc,lco,os) =
{ received = r ; duplicates = d ; produced = p ; skipped = s ; contigCount = cc ; lastContigOffset = lco ; offsets = os }
new (r,d,p,cc) =
{ received = r ; duplicates = d ; produced = p ; contigCount = cc }
end


let printReport (report:Report) =
let pending = totalMessageCount - report.received
let lag = report.produced - report.received
let offsetStr = report.offsets |> Seq.map (fun kvp -> sprintf "p=%i o=%i" kvp.Key kvp.Value) |> String.concat " ; "
let contigDelta = report.received - report.contigCount
Log.info "monitor|produced=%i received=%i lag=%i duplicates=%i pending=%i contig=%i contig_delta=%i last_contig_offset=%A offsets=[%s] running_time_min=%f"
report.produced report.received lag report.duplicates pending report.contigCount contigDelta report.lastContigOffset offsetStr sw.Elapsed.TotalMinutes
//let offsetStr = report.offsets |> Seq.map (fun kvp -> sprintf "p=%i o=%i" kvp.Key kvp.Value) |> String.concat " ; "
let contigDelta = report.received - report.contigCount - 1
Log.info "monitor|produced=%i received=%i lag=%i duplicates=%i pending=%i contig=%i contig_delta=%i running_time_min=%f"
report.produced report.received lag report.duplicates pending report.contigCount contigDelta sw.Elapsed.TotalMinutes



Expand All @@ -135,31 +148,25 @@ module Reporter =
let mb = Mb.Start (fun mb ->

let ack = Ack ()
let skipped = ref 0
let offsets = ref Map.empty

mb.Error.Add (fun e -> Log.error "mailbox_error|%O" e)
mb.Error.Add (fun e -> Log.error "mailbox_error|%O" e ; completed.TrySetException e |> ignore)

let report () =
let r = new Report(ack.Received, ack.Duplicates, ack.Sent, !skipped, ack.Contig, None, !offsets)
let r = new Report(ack.Received, ack.Duplicates, ack.Sent, ack.Contig)
printReport r
Log.info "pending=%i" ack.Pending
r

let rec loop () = async {
let! req = mb.Receive ()
match req with
| Received (values,messageBatchCount) ->
| Received (values,_messageBatchCount) ->

ack.Ack (values |> Seq.map fst)

Interlocked.Add(skipped, messageBatchCount - values.Length) |> ignore

if ack.Received >= totalMessageCount then
Log.info "received_complete_set|receive_count=%i" ack.Contig
IVar.tryPut () completed |> ignore


| Produced (values,p,o) ->
ack.Prepare values

Expand Down Expand Up @@ -189,7 +196,7 @@ let monitor = async {
do! Async.Sleep 5000
let! report = Reporter.report reporter
printReport report
if (report.received - report.contigCount) > 1000000 then
if (report.received - report.contigCount) > 100000 then
Log.error "contig_delta_surpassed_threshold"
IVar.tryPut () completed |> ignore }

Expand Down Expand Up @@ -218,7 +225,7 @@ let producer = async {
ProducerConfig.create (
topic = topicName,
partition = Partitioner.roundRobin,
requiredAcks = RequiredAcks.Local,
requiredAcks = RequiredAcks.AllInSync,
batchSizeBytes = ProducerConfig.DefaultBatchSizeBytes,
bufferSizeBytes = ProducerConfig.DefaultBufferSizeBytes)

Expand All @@ -227,13 +234,11 @@ let producer = async {
let produceProcess =
Seq.init batchCount id
|> Seq.map (fun batchNumber -> async {
try
let batch = Array.init batchSize (fun j -> (batchNumber * batchSize + j))
let pms = batch |> Array.map message
let! res = Producer.produceBatch producer (fun pc -> batchNumber % pc,pms)
Reporter.produced reporter (batch,res.partition,res.offset)
with ex ->
Log.error "produce_error|error=%O" ex })
let batch = Array.init batchSize (fun j -> (batchNumber * batchSize + j))
let pms = batch |> Array.map message
Reporter.produced reporter (batch,0,0L)
let! res = Producer.produceBatch producer (fun pc -> batchNumber % pc,pms)
return () })
|> Async.parallelThrottledIgnore producerThreads

return! Async.choose (IVar.get completed) produceProcess
Expand Down Expand Up @@ -265,8 +270,7 @@ let consumer = async {
let connCfg =
KafkaConfig.create (
[KafkaUri.parse host],
tcpConfig = chanConfig,
bootstrapConnectRetryPolicy = (RetryPolicy.constantMs 1000 |> RetryPolicy.maxAttempts 2))
tcpConfig = chanConfig)
use! conn = Kafka.connAsync connCfg

let consumerCfg =
Expand Down

0 comments on commit daa4d4d

Please sign in to comment.