Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful handling of incomplete streaming line data #4

Open
ghost opened this issue Dec 3, 2015 · 5 comments
Open

Graceful handling of incomplete streaming line data #4

ghost opened this issue Dec 3, 2015 · 5 comments

Comments

@ghost
Copy link

ghost commented Dec 3, 2015

I've recently seen some errors in the streaming api with message
"unhandled event MessageChunk(Bytes(),) in state Disconnected"
where xxx is a number.
Looking at the earlier log messages this seems to have been caused by some invalid JSON trying to be parsed in my case it was - {"tick":{"instrumen . I'm guessing an incomplete line was sent from Oanda.

Looking at StreamingConnector.scala line 112 the statement line.parseJson.asJsObject.fields.head assumes the line is valid JSON in order to parse correctly. In the case it is not valid JSON an json exception is thrown in - spray.json.JsonParsersParsingException.

A possible work around may be to put a catch around line.parseJson.asJsObject.fields.head and return a Option[JSON] which could then be an additional case to match against.

Hope this all makes sense.

@msilb
Copy link
Owner

msilb commented Dec 3, 2015

Hi @fluxgain thanks for raising this. It would be interesting to find out why it happens, I mean why Oanda sends incomplete/invalid JSON strings. Could it be somehow related to the configuration of http client (spray.io)? It's easy enough to wrap the response in an Option type but would be good to understand why this happens. Any ideas?

@ghost
Copy link
Author

ghost commented Dec 4, 2015

Yes it does feel strange that Oanda would send incomplete JSON. Perhaps it might be some intermittent internet connectivity issue. What I was not able to figure out is what happens after the bad line, does it send the rest of the line or a brand new line. In the logs it mentions unhandled events for MessageChunk after the json error and no further lines are processed. So at the moment I don't think there is enough information to get to the bottom of the issue. Perhaps with the option type and a dump of the bytes in the messagechunk that might provide more information to help solve the issue.

I've attached the log file that might provide more insight. Hope this helps.

INFO : com.msilb.scalanda.streamapi.StreamingConnector - Received new price tick: Tick(USD_JPY,2015-12-03T12:40:19.205456Z,123.289,123.316)
INFO : com.msilb.scalanda.streamapi.StreamingConnector - Received new price tick: Tick(USD_CHF,2015-12-03T12:40:19.209939Z,1.01824,1.01899)
ERROR: akka.actor.OneForOneStrategy - Unexpected end-of-input at input index 19 (line 1, position 20), expected '"':
{"tick":{"instrumen
               ^

spray.json.JsonParser$ParsingException: Unexpected end-of-input at input index 19 (line 1, position 20), expected '"':
{"tick":{"instrumen
               ^

    at spray.json.JsonParser.fail(JsonParser.scala:213)
    at spray.json.JsonParser.require(JsonParser.scala:196)
    at spray.json.JsonParser.string(JsonParser.scala:144)
    at spray.json.JsonParser.members$1(JsonParser.scala:77)
    at spray.json.JsonParser.object(JsonParser.scala:86)
    at spray.json.JsonParser.value(JsonParser.scala:60)
    at spray.json.JsonParser.members$1(JsonParser.scala:81)
    at spray.json.JsonParser.object(JsonParser.scala:86)
    at spray.json.JsonParser.value(JsonParser.scala:60)
    at spray.json.JsonParser.parseJsValue(JsonParser.scala:43)
    at spray.json.JsonParser$.apply(JsonParser.scala:28)
    at spray.json.PimpedString.parseJson(package.scala:45)
    at com.msilb.scalanda.streamapi.StreamingConnector$$anonfun$4$$anonfun$applyOrElse$1.apply(StreamingConnector.scala:112)
    at com.msilb.scalanda.streamapi.StreamingConnector$$anonfun$4$$anonfun$applyOrElse$1.apply(StreamingConnector.scala:111)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at com.msilb.scalanda.streamapi.StreamingConnector$$anonfun$4.applyOrElse(StreamingConnector.scala:111)
    at com.msilb.scalanda.streamapi.StreamingConnector$$anonfun$4.applyOrElse(StreamingConnector.scala:84)
    at com.msilb.scalanda.streamapi.StreamingConnector$$anonfun$4.applyOrElse(StreamingConnector.scala:84)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.FSM$class.processEvent(FSM.scala:654)
    at com.msilb.scalanda.streamapi.StreamingConnector.processEvent(StreamingConnector.scala:55)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:648)
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:642)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at com.msilb.scalanda.streamapi.StreamingConnector.aroundReceive(StreamingConnector.scala:55)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
    at akka.actor.ActorCell.invoke(ActorCell.scala:494)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<879 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<1598 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<3864 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<333 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<300 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<993 bytes>),) in state Disconnected
WARN : com.msilb.scalanda.streamapi.StreamingConnector - unhandled event MessageChunk(Bytes(<998 bytes>),) in state Disconnected

@ghost
Copy link
Author

ghost commented Jan 17, 2016

Hi @mslib, so I put the Option into the code as outlined above. The error did occur again, however with the Option added to the code, successive ticks now are processed correctly rather than errorring. Also when an error occurs it looks like only 1 tick has the error and it is split into 2 lines. An error with the first part of the line and the second error with the last part of the line.

Below is my log snippet -

2016-01-08 13:30:04,624 INFO (ActorSystem-akka.actor.default-dispatcher-28) [com.msilb.scalanda.streamapi.StreamingConnector] - Received new price tick: Tick(EUR_USD,2016-01-08T13:30:03.095937Z,1.08146,1.08246)
2016-01-08 13:30:04,626 ERROR (ActorSystem-akka.actor.default-dispatcher-28) [com.msilb.scalanda.streamapi.StreamingConnector] - Unable to parse into json: {"tick":{"instrument":"EUR_JPY","time":"2016-01-08T13:30:03.047139Z","bi
2016-01-08 13:30:04,626 ERROR (ActorSystem-akka.actor.default-dispatcher-28) [com.msilb.scalanda.streamapi.StreamingConnector] - Unable to parse into json: d":128.441,"ask":128.541}}
=== Dead letter DeadLetter(Tick(USD_JPY,2016-01-08T13:30:03.010932Z,118.745,118.845),Actor[akka://ActorSystem/user/mainActor/oandaStreamingConnectorActor#523480056],Actor[akka://ActorSystem/user/mainActor/oandaStreamingListenerActor#-352058736])
2016-01-08 13:30:04,626 INFO (ActorSystem-akka.actor.default-dispatcher-30) [com.msilb.scalanda.streamapi.StreamingConnector] - Received new price tick: Tick(USD_JPY,2016-01-08T13:30:03.010932Z,118.745,118.845)
=== Dead letter DeadLetter(Tick(USD_JPY,2016-01-08T13:30:03.078363Z,118.688,118.788),Actor[akka://ActorSystem/user/mainActor/oandaStreamingConnectorActor#523480056],Actor[akka://ActorSystem/user/mainActor/oandaStreamingListenerActor#-352058736])
2016-01-08 13:30:04,626 INFO (ActorSystem-akka.actor.default-dispatcher-30) [com.msilb.scalanda.streamapi.StreamingConnector] - Received new price tick: Tick(USD_JPY,2016-01-08T13:30:03.078363Z,118.688,118.788)

So the Option workaround limits the number of tick errors but does not help identify the root cause.

@msilb
Copy link
Owner

msilb commented Jan 17, 2016

Hi @fluxgain thanks for checking this. It might have something to do with the underlying spray http client configuration. I tried a few settings but was unable to find out what's causing it. If you can make a PR, I'll merge it into the code base and release a new version with the fix.

@janrockdev
Copy link

Hi, I have got the same issue and it seems to be related to maximum requested characters from API. 3 trades are ok, with 4 it reach maximum for default and print on the end this:

":"100151.5521","marg ... (4320 bytes total)),HttpProtocol(HTTP/1.1))

...it will cause the crash of JSON.
There is somewhere somehow chance to set unlimited GET for the call, but no idea how to add it with correct syntax.
Hint: HttpEntity.IndefiniteLength

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants