Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
Merge pull request #217 from eulerfx/mem
Browse files Browse the repository at this point in the history
Fix memory leak in 0.1.15
  • Loading branch information
eulerfx authored Mar 27, 2018
2 parents a449ead + e256e75 commit f59d47b
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 162 deletions.
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 0.1.16 - 27.3.2018
* Tested 0.1.16-alpha01

### 0.1.16-alpha01 - 27.3.2018
* BUG: memory leak in 0.1.15.

### 0.1.15 - 19.3.2018
* BREAKING: `FetchResponse` item now explicit structure rather than tuple. Breaking only if using low-level Fetch API.

Expand Down
8 changes: 4 additions & 4 deletions src/kafunk/AssemblyInfo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ open System.Reflection
[<assembly: AssemblyTitleAttribute("kafunk")>]
[<assembly: AssemblyProductAttribute("kafunk")>]
[<assembly: AssemblyDescriptionAttribute("F# client for Kafka")>]
[<assembly: AssemblyVersionAttribute("0.1.15")>]
[<assembly: AssemblyFileVersionAttribute("0.1.15")>]
[<assembly: AssemblyVersionAttribute("0.1.16")>]
[<assembly: AssemblyFileVersionAttribute("0.1.16")>]
do ()

module internal AssemblyVersionInformation =
let [<Literal>] AssemblyTitle = "kafunk"
let [<Literal>] AssemblyProduct = "kafunk"
let [<Literal>] AssemblyDescription = "F# client for Kafka"
let [<Literal>] AssemblyVersion = "0.1.15"
let [<Literal>] AssemblyFileVersion = "0.1.15"
let [<Literal>] AssemblyVersion = "0.1.16"
let [<Literal>] AssemblyFileVersion = "0.1.16"
2 changes: 1 addition & 1 deletion src/kafunk/Tcp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ type ReqRepSession<'a, 'b, 's> internal
do! receive |> AsyncSeq.iter __.Demux
//Log.info "session_closed2|remote_endpoint=%O" remoteEndpoint
with ex ->
Log.error "session_exception|remote_endpoint=%O error=\"%O\"" remoteEndpoint ex
Log.warn "session_exception|remote_endpoint=%O error=\"%O\"" remoteEndpoint ex
__.CancelAll (Some ex)
return raise ex }

Expand Down
82 changes: 1 addition & 81 deletions src/kafunk/Utility/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ module IVar =
let inline get (i:IVar<'a>) : Async<'a> =
i.Task |> awaitTaskCancellationAsError

// /// Creates an async computation which returns the value contained in an IVar.
// let inline getWithTimeout (timeout:TimeSpan) (timeoutResult:unit -> 'a) (i:IVar<'a>) : Async<'a> = async {
// let! ct = Async.CancellationToken
// (Task.Delay (timeout, ct)).ContinueWith (Func<_,_>(fun _ -> tryPut (timeoutResult ()) i))
// |> ignore
// return! i.Task |> awaitTaskCancellationAsError }

/// Creates an async computation which returns the value contained in an IVar.
let inline getWithTimeout (timeout:TimeSpan) (timeoutResult:unit -> 'a) (i:IVar<'a>) : Async<'a> = async {
use _timer = new Timer((fun _ -> tryPut (timeoutResult ()) i |> ignore), null, (int timeout.TotalMilliseconds), Timeout.Infinite)
Expand Down Expand Up @@ -312,37 +305,6 @@ module Async =
startThreadPoolWithContinuations (a, ok, err, cnc, cts.Token)
startThreadPoolWithContinuations (b, ok, err, cnc, cts.Token) }

let choose2 (a:Async<'a>) (b:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
let cts = CancellationTokenSource.CreateLinkedTokenSource ct
let res = IVar.create ()
IVar.intoCancellationToken cts res
let inline ok a = IVar.tryPut a res |> ignore
let inline err e = IVar.tryError e res |> ignore
let inline cnc (_:OperationCanceledException) = IVar.tryCancel res |> ignore
startThreadPoolWithContinuations (a, ok, err, cnc, cts.Token)
startThreadPoolWithContinuations (b, ok, err, cnc, cts.Token)
return! IVar.get res }

let chooseChoice (a:Async<'a>) (b:Async<'b>) : Async<Choice<'a, 'b>> =
choose (a |> map Choice1Of2) (b |> map Choice2Of2)

/// Cancels a computation and returns None if the CancellationToken is cancelled before the
/// computation completes.
let withCancellation (ct:CancellationToken) (a:Async<'a>) : Async<'a> = async {
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a>()
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled() |> ignore)
let a = async {
try
let! a = a
tcs.TrySetResult a |> ignore
with ex ->
tcs.TrySetException ex |> ignore }
Async.Start (a, cts.Token)
return! tcs.Task |> awaitTaskCancellationAsError }

/// Cancels a computation and returns None if the CancellationToken is cancelled before the
/// computation completes.
let cancelTokenWith (ct:CancellationToken) (f:unit -> 'a) (a:Async<'a>) : Async<'a> = async {
Expand Down Expand Up @@ -374,38 +336,11 @@ module Async =
tcs.TrySetException ex |> ignore }
Async.Start (a, cts.Token)
return! tcs.Task |> awaitTaskCancellationAsError }

let cancelWithTask (t:Task<unit>) (a:Async<'a>) : Async<'a option> = async {
let! ct = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
let t = t.ContinueWith (fun (_:Task<unit>) -> cts.Cancel () ; None)
let at = Async.StartAsTask (a, cancellationToken = cts.Token) |> Task.map Some
let! r = Task.WhenAny (t, at) |> awaitTaskCancellationAsError
return r.Result }

let cancelWithTaskThrow (err:exn -> exn) (t:Task<unit>) (a:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
let t =
t
|> Task.extend (fun t ->
cts.Cancel () |> ignore
raise (err t.Exception))
let at = Async.StartAsTask (a, cancellationToken = cts.Token)
let! r = Task.WhenAny (at, t) |> awaitTaskCancellationAsError
return r.Result }

let cancelWithTaskTimeout (timeout:TimeSpan) (t:Task<unit>) (a:Async<'a>) : Async<'a> = async {
let timeout = (Task.Delay timeout).ContinueWith (fun (_:Task) -> failwith "task_timedout")
let t = t |> Task.extend (fun _ -> failwith "task_cancelled")
let! at = Async.StartChildAsTask a
let! r = Task.WhenAny (at, t, timeout) |> awaitTaskCancellationAsError
return r.Result }

let sleep (s:TimeSpan) : Async<unit> =
Async.Sleep (int s.TotalMilliseconds)

let timeoutWith (g:'a -> 'b) (f:unit -> 'b) (timeout:TimeSpan) (c:Async<'a>) : Async<'b> =
let private timeoutWith (g:'a -> 'b) (f:unit -> 'b) (timeout:TimeSpan) (c:Async<'a>) : Async<'b> =
let timeout = async {
do! sleep timeout
return f () }
Expand All @@ -420,21 +355,6 @@ module Async =
let timeoutResult (timeout:TimeSpan) (c:Async<'a>) : Async<Result<'a, TimeoutException>> =
timeoutResultWith (fun () -> TimeoutException(sprintf "The operation timed out after %fsec" timeout.TotalSeconds)) timeout c

let chooseTasks (a:Task<'T>) (b:Task<'U>) : Async<Choice<'T * Task<'U>, 'U * Task<'T>>> =
async {
let! ct = Async.CancellationToken
let i = Task.WaitAny( [| (a :> Task);(b :> Task) |],ct)
if i = 0 then return (Choice1Of2 (a.Result, b))
elif i = 1 then return (Choice2Of2 (b.Result, a))
else return! failwith (sprintf "unreachable, i = %d" i) }

let chooseTasksAsync (a:Task<'T>) (b:Task<'U>) : Async<Choice<'T * Task<'U>, 'U * Task<'T>>> = async {
let ta, tb = a :> Task, b :> Task
let! i = Task.WhenAny( ta, tb ) |> awaitTaskCancellationAsError
if i = ta then return (Choice1Of2 (a.Result, b))
elif i = tb then return (Choice2Of2 (b.Result, a))
else return! failwith "unreachable" }

let chooseTasksUnit (a:Task<'T>) (b:Task) : Async<Choice<'T * Task, unit * Task<'T>>> = async {
let ta, tb = a :> Task, b
let! i = Task.WhenAny( ta, tb ) |> awaitTaskCancellationAsError
Expand Down
39 changes: 25 additions & 14 deletions src/kafunk/Utility/Resource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ open System.Threading
open System.Threading.Tasks
open Kafunk
open FSharp.Control
open Kafunk.RetryPolicy

let private Log = Log.create "Kafunk.Resource"

Expand Down Expand Up @@ -108,10 +107,10 @@ type Resource<'r> internal (id:string, create:Task<unit> -> ResourceEpoch<'r> op
if fault then currentEpoch.tryCloseOrFault ex
else currentEpoch.tryCloseOrFault None
if closed then
Log.trace "closing_resource|type=%s version=%i" name currentEpoch.version
currentEpoch.cts.Dispose ()
Log.trace "closing_resource|type=%s version=%i" name currentEpoch.version
do! close (currentEpoch, ex)
return { currentEpoch with version = currentEpoch.version + 1 ; resource = Unchecked.defaultof<_> }
currentEpoch.cts.Dispose ()
return { currentEpoch with version = currentEpoch.version + 1 ; resource = Unchecked.defaultof<_> }
else
Log.trace "resource_already_closed|type=%s calling_version=%i current_version=%i"
name callingEpoch.version currentEpoch.version
Expand Down Expand Up @@ -208,20 +207,32 @@ module Resource =
let getResource (r:Resource<'r>) =
r.Get () |> Async.map (fun ep -> ep.resource)

let private resourceError (ex:exn) =
if isNull ex then ClosedResourceException() :> exn
else FaultedResourceException(ex) :> exn

// NB: to avoid exceptions when the token is already disposed
let private resourceEpochCancellationToken (ep:ResourceEpoch<'r>) =
try ep.cts.Token
with :? ObjectDisposedException ->
let cts = new CancellationTokenSource()
cts.Cancel ()
cts.Token

let injectWithRecovery (rp:RetryPolicy) (op:'r -> ('a -> Async<Result<'b, ResourceErrorAction<'b, exn>>>)) (r:Resource<'r>) (a:'a) : Async<'b> =
let rec go (rs:RetryState) = async {
try
let! ep = r.Get ()
let! b = Async.cancelWithTaskThrow resourceError ep.state.Task (op ep.resource a)
//let! b = op ep.resource a
let! b = Async.cancelWithToken (resourceEpochCancellationToken ep) (op ep.resource a)
match b with
| Success b ->
| None ->
let! rs' = RetryPolicy.awaitNextState rp rs
match rs' with
| None ->
let msg = sprintf "escalating_after_retry_attempts_depleted|type=%s v=%i attempt=%i" r.Name ep.version rs.attempt
//Log.trace "%s" msg
return raise (FaultedResourceOperationException(msg, None))
| Some rs' ->
return! go rs'
| Some (Success b) ->
return b
| Failure (Retry e) ->
| Some (Failure (Retry e)) ->
Log.trace "retrying_after_failure|type=%s version=%i attempt=%i error=\"%O\""
r.Name ep.version rs.attempt e
let! rs' = RetryPolicy.awaitNextState rp rs
Expand All @@ -232,12 +243,12 @@ module Resource =
return raise (FaultedResourceOperationException(msg, e))
| Some rs' ->
return! go rs'
| Failure (CloseResume (ex,b)) ->
| Some (Failure (CloseResume (ex,b))) ->
Log.trace "closing_and_resuming_after_failure|type=%s v=%i attempt=%i error=\"%O\""
r.Name ep.version rs.attempt ex
do! r.Close (ep,ex)
return b
| Failure (CloseRetry ex) ->
| Some (Failure (CloseRetry ex)) ->
// TODO: collect errors?
Log.trace "closing_and_retrying_after_failure|type=%s v=%i attempt=%i error=\"%O\""
r.Name ep.version rs.attempt ex
Expand Down
125 changes: 113 additions & 12 deletions tests/kafunk.Tests/AsyncTest.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,119 @@ open System.Threading
open System.Threading.Tasks
open System.Diagnostics

let choose (a:Async<'a>) (b:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
return!
Async.FromContinuations <| fun (ok,err,cnc) ->
let state = ref 0
let cts = CancellationTokenSource.CreateLinkedTokenSource ct
let cancel () =
cts.Cancel()
cts.Dispose()
let ok a =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok a
cancel ()
let err (ex:exn) =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
cancel ()
err ex
let cnc ex =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
cancel ()
cnc ex
Async.startThreadPoolWithContinuations (a, ok, err, cnc, cts.Token)
Async.startThreadPoolWithContinuations (b, ok, err, cnc, cts.Token) }

//let file = @"C:\code\kafunk\tests\kafunk.Tests\FetchResponse_RecordBatch.bin"
let file = @"C:\code\kafunk\tests\kafunk.Tests\fetch_p=0_p=3_daccd6f71ae945f5afe7cb57a3a7ee28.bin"
let buf = System.IO.File.ReadAllBytes(file) |> Binary.ofArray |> BinaryZipper
let res = FetchResponse.Read (5s, buf)

for (t,ps) in res.topics do
for (p,_,hwo,_,_,_,_,ms) in ps do
let cms = ConsumerMessageSet(t,p,ms,hwo)
let lastOffset = ConsumerMessageSet.lastOffset cms
let nextOffset = MessageSet.nextOffset ms hwo
printfn "p=%i message_count=%i last_offset=%i next_offset=%i" p ms.messages.Length lastOffset nextOffset
for m in ms.messages do
printfn "key=%s value=%s" (Binary.toString m.message.value) (Binary.toString m.message.value)
let chooseTaskOrAsync2 (t:Task<'a>) (a:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
let tcs = new TaskCompletionSource<_>()
use _reg = cts.Token.Register (fun () -> tcs.TrySetCanceled () |> ignore)
IVar.intoCancellationToken cts tcs
//let opts = TaskContinuationOptions.DenyChildAttach
//let t = Task.Factory.ContinueWhenAny([|t|], (fun (t:Task<_>) -> tcs.TrySetResult t.Result |> ignore), cts.Token)
let _t = t.ContinueWith((fun (t:Task<'a>) -> tcs.TrySetResult t.Result |> ignore ; cts.Cancel ()), cts.Token)
let a = async {
try
let! a = a
tcs.TrySetResult a |> ignore
cts.Cancel ()
with ex ->
tcs.TrySetException ex |> ignore
cts.Cancel () }
Async.Start (a, cts.Token)
return! tcs.Task |> Async.awaitTaskCancellationAsError }

let chooseTaskOrAsync (t:Task<'a>) (a:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
let tcs = new TaskCompletionSource<_>()
IVar.intoCancellationToken cts tcs
let t = async {
try
let! t = Async.AwaitTask t
tcs.TrySetResult t |> ignore
with ex ->
tcs.TrySetException ex |> ignore }
let! _ = Async.StartChild t
let a = async {
try
let! a = a
tcs.TrySetResult a |> ignore
with ex ->
tcs.TrySetException ex |> ignore }
let! _ = Async.StartChild a
return! tcs.Task |> Async.awaitTaskCancellationAsError }

let cancelWithToken (ct:CancellationToken) (a:Async<'a>) : Async<'a option> = async {
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a option>()
let _reg = cts.Token.Register (fun () -> tcs.TrySetResult None |> ignore)
let a = async {
try
let! a = a
tcs.TrySetResult (Some a) |> ignore
with ex ->
tcs.TrySetException ex |> ignore }
Async.Start (a, cts.Token)
return! tcs.Task |> Async.awaitTaskCancellationAsError }


let go2 = async {
let cts = new CancellationTokenSource()
let N = 1000000
return!
Seq.init N id
|> Seq.map (fun i -> async {
do! cancelWithToken cts.Token (Async.Sleep 100) |> Async.Ignore
return () })
|> Async.parallelThrottledIgnore 1000
}

let go = async {
let t = Task.never
let N = 1000000
return!
Seq.init N id
|> Seq.map (fun i -> async {
do! chooseTaskOrAsync2 t (Async.Sleep 100)
return () })
|> Async.parallelThrottledIgnore 1000
}


let go3 = async {
let t = Task.never |> Async.AwaitTask
let N = 1000000
return!
Seq.init N id
|> Seq.map (fun i -> async {
do! choose t (Async.Sleep 100)
return () })
|> Async.parallelThrottledIgnore 1000
}

Async.RunSynchronously go2
4 changes: 2 additions & 2 deletions tests/kafunk.Tests/Consumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ let go = async {
groupId = group,
topic = topic,
autoOffsetReset = AutoOffsetReset.StartFromTime Time.EarliestOffset,
fetchMaxBytes = 500000,
fetchMaxBytes = 1000000,
fetchMaxBytesTotal = 50000000,
fetchMaxBytesOverride = 1000000,
//fetchMinBytes = 0,
Expand Down Expand Up @@ -72,7 +72,7 @@ let go = async {
//use! _cnc = Async.OnCancel (fun () -> Log.warn "cancelling_handler")
//for m in ms.messageSet.messages do
// Log.info "key=%s" (Binary.toString m.message.key)
Log.info "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i"
Log.trace "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i"
ms.topic
ms.partition
(ms.messageSet.messages.Length)
Expand Down
Loading

0 comments on commit f59d47b

Please sign in to comment.