diff --git a/tests/kafunk.Tests/AsyncTest.fsx b/tests/kafunk.Tests/AsyncTest.fsx index 555aa28..575ec2f 100644 --- a/tests/kafunk.Tests/AsyncTest.fsx +++ b/tests/kafunk.Tests/AsyncTest.fsx @@ -40,8 +40,8 @@ let chooseTaskOrAsync2 (t:Task<'a>) (a:Async<'a>) : Async<'a> = async { use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled () |> ignore) IVar.intoCancellationToken cts tcs //let opts = TaskContinuationOptions.DenyChildAttach - let t = Task.Factory.ContinueWhenAny([|t|], (fun (t:Task<_>) -> tcs.TrySetResult t.Result |> ignore), cts.Token) - //t.ContinueWith((fun (t:Task<'a>) -> tcs.TrySetResult t.Result |> ignore ; cts.Cancel ()), opts) |> ignore + //let t = Task.Factory.ContinueWhenAny([|t|], (fun (t:Task<_>) -> tcs.TrySetResult t.Result |> ignore), cts.Token) + let _t = t.ContinueWith((fun (t:Task<'a>) -> tcs.TrySetResult t.Result |> ignore ; cts.Cancel ()), cts.Token) let a = async { try let! a = a @@ -78,7 +78,7 @@ let cancelWithToken (ct:CancellationToken) (a:Async<'a>) : Async<'a option> = as let! ct2 = Async.CancellationToken use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2) let tcs = new TaskCompletionSource<'a option>() - use _reg = cts.Token.Register (fun () -> tcs.TrySetResult None |> ignore) + let _reg = cts.Token.Register (fun () -> tcs.TrySetResult None |> ignore) let a = async { try let! a = a @@ -123,4 +123,4 @@ let go3 = async { |> Async.parallelThrottledIgnore 1000 } -Async.RunSynchronously go3 \ No newline at end of file +Async.RunSynchronously go2 \ No newline at end of file diff --git a/tests/kafunk.Tests/ProducerConsumer.fsx b/tests/kafunk.Tests/ProducerConsumer.fsx index 1f47548..576a26e 100644 --- a/tests/kafunk.Tests/ProducerConsumer.fsx +++ b/tests/kafunk.Tests/ProducerConsumer.fsx @@ -28,22 +28,30 @@ let messageKeyBytes = messageKey |> System.Text.Encoding.UTF8.GetBytes |> Binary let chanConfig = ChanConfig.create ( - requestTimeout = TimeSpan.FromSeconds 10.0, - connectRetryPolicy = RetryPolicy.none) + requestTimeout = TimeSpan.FromSeconds 10.0) let consuming = new CountdownEvent(consumerCount) let completed = IVar.create () let sw = System.Diagnostics.Stopwatch.StartNew() +//type Value = int + +//type State = +// | Produced of DateTime +// | Received of DateTime * DateTime + type Ack () = + /// maps values to their state (prepared | ack'd) let pending = SortedList(Comparer.Default) let mutable contig = -1 let mutable duplicates = 0 let mutable received = 0 let mutable sent = 0 + /// Find the longest contiguous sequence starting at the specified index. + /// Returns the largest contiguous value. let rec findContig i prev = if i = pending.Count then prev else let v = pending.Keys.[i] @@ -53,6 +61,7 @@ type Ack () = else prev + /// Clears the contiguous sequence up to the specified value. let pruneUpTo v = let i = pending.IndexOfKey v if i > 0 then @@ -65,25 +74,32 @@ type Ack () = member __.Received = received member __.Pending = pending.Count + /// Marks a sequence of items as pending and awaiting acknowledgement. member __.Prepare (vs:int seq) = for v in vs do pending.Add (v, 0) sent <- sent + 1 - member __.Ack (vs:int seq) = + /// Marks a set of items as acknowledged, keeping track of duplicates. + /// Also maintains a contiguous sequence of items to verify ordering. + member __.Ack (vs:int seq) = for v in vs do - //if v <= contig then - // duplicates <- duplicates + 1 - //else - let mutable s = Unchecked.defaultof<_> - if (pending.TryGetValue (v, &s)) then - if s = 1 then - duplicates <- duplicates + 1 - else - pending.[v] <- 1 - received <- received + 1 + let mutable s = Unchecked.defaultof<_> + if (pending.TryGetValue (v, &s)) then + if s = 1 then + // already ack'd + duplicates <- duplicates + 1 else - failwithf "invalid state=%i" v + // first time ack + pending.[v] <- 1 + received <- received + 1 + else + let first,last = + if pending.Count > 0 then pending.Keys.[0], pending.Keys.[pending.Keys.Count - 1] + else -1,-1 + if first < v then + failwithf "ack v=%i is not in pending list and greater than prune watermark first=%i last=%i" v first last + let wm = contig let wm' = findContig 0 wm if wm' > wm then @@ -97,22 +113,19 @@ type Report = val received : int val duplicates : int val produced : int - val skipped : int val contigCount : int - val lastContigOffset : (Partition * Offset) option - val offsets : Map - new (r,d,p,s,cc,lco,os) = - { received = r ; duplicates = d ; produced = p ; skipped = s ; contigCount = cc ; lastContigOffset = lco ; offsets = os } + new (r,d,p,cc) = + { received = r ; duplicates = d ; produced = p ; contigCount = cc } end let printReport (report:Report) = let pending = totalMessageCount - report.received let lag = report.produced - report.received - let offsetStr = report.offsets |> Seq.map (fun kvp -> sprintf "p=%i o=%i" kvp.Key kvp.Value) |> String.concat " ; " - let contigDelta = report.received - report.contigCount - Log.info "monitor|produced=%i received=%i lag=%i duplicates=%i pending=%i contig=%i contig_delta=%i last_contig_offset=%A offsets=[%s] running_time_min=%f" - report.produced report.received lag report.duplicates pending report.contigCount contigDelta report.lastContigOffset offsetStr sw.Elapsed.TotalMinutes + //let offsetStr = report.offsets |> Seq.map (fun kvp -> sprintf "p=%i o=%i" kvp.Key kvp.Value) |> String.concat " ; " + let contigDelta = report.received - report.contigCount - 1 + Log.info "monitor|produced=%i received=%i lag=%i duplicates=%i pending=%i contig=%i contig_delta=%i running_time_min=%f" + report.produced report.received lag report.duplicates pending report.contigCount contigDelta sw.Elapsed.TotalMinutes @@ -135,31 +148,25 @@ module Reporter = let mb = Mb.Start (fun mb -> let ack = Ack () - let skipped = ref 0 - let offsets = ref Map.empty - mb.Error.Add (fun e -> Log.error "mailbox_error|%O" e) + mb.Error.Add (fun e -> Log.error "mailbox_error|%O" e ; completed.TrySetException e |> ignore) let report () = - let r = new Report(ack.Received, ack.Duplicates, ack.Sent, !skipped, ack.Contig, None, !offsets) + let r = new Report(ack.Received, ack.Duplicates, ack.Sent, ack.Contig) printReport r - Log.info "pending=%i" ack.Pending r let rec loop () = async { let! req = mb.Receive () match req with - | Received (values,messageBatchCount) -> + | Received (values,_messageBatchCount) -> ack.Ack (values |> Seq.map fst) - Interlocked.Add(skipped, messageBatchCount - values.Length) |> ignore - if ack.Received >= totalMessageCount then Log.info "received_complete_set|receive_count=%i" ack.Contig IVar.tryPut () completed |> ignore - | Produced (values,p,o) -> ack.Prepare values @@ -189,7 +196,7 @@ let monitor = async { do! Async.Sleep 5000 let! report = Reporter.report reporter printReport report - if (report.received - report.contigCount) > 1000000 then + if (report.received - report.contigCount) > 100000 then Log.error "contig_delta_surpassed_threshold" IVar.tryPut () completed |> ignore } @@ -218,7 +225,7 @@ let producer = async { ProducerConfig.create ( topic = topicName, partition = Partitioner.roundRobin, - requiredAcks = RequiredAcks.Local, + requiredAcks = RequiredAcks.AllInSync, batchSizeBytes = ProducerConfig.DefaultBatchSizeBytes, bufferSizeBytes = ProducerConfig.DefaultBufferSizeBytes) @@ -227,13 +234,11 @@ let producer = async { let produceProcess = Seq.init batchCount id |> Seq.map (fun batchNumber -> async { - try - let batch = Array.init batchSize (fun j -> (batchNumber * batchSize + j)) - let pms = batch |> Array.map message - let! res = Producer.produceBatch producer (fun pc -> batchNumber % pc,pms) - Reporter.produced reporter (batch,res.partition,res.offset) - with ex -> - Log.error "produce_error|error=%O" ex }) + let batch = Array.init batchSize (fun j -> (batchNumber * batchSize + j)) + let pms = batch |> Array.map message + Reporter.produced reporter (batch,0,0L) + let! res = Producer.produceBatch producer (fun pc -> batchNumber % pc,pms) + return () }) |> Async.parallelThrottledIgnore producerThreads return! Async.choose (IVar.get completed) produceProcess @@ -265,8 +270,7 @@ let consumer = async { let connCfg = KafkaConfig.create ( [KafkaUri.parse host], - tcpConfig = chanConfig, - bootstrapConnectRetryPolicy = (RetryPolicy.constantMs 1000 |> RetryPolicy.maxAttempts 2)) + tcpConfig = chanConfig) use! conn = Kafka.connAsync connCfg let consumerCfg =