RecordIO Framing

The codec parses a ByteString stream in the RecordIO format into distinct frames.

For instance, the response body:

128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}

is parsed into frames:

{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-simple-codecs" % "0.11"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-simple-codecs_2.12</artifactId>
  <version>0.11</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-simple-codecs_2.12", version: "0.11"
}

Usage

The helper object RecordIOFraming provides a scanner factory method for a Flow[ByteString, ByteString, _] which parses out RecordIO frames.

For instance, given the sample input:

Scala
val FirstRecordData =
  """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""

val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"

val basicSource = Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))

Running it through the RecordIO framing flow:

Scala
val result = basicSource via
RecordIOFraming.scanner() runWith
Sink.seq

We obtain:

Scala
result.futureValue shouldBe Seq(ByteString(FirstRecordData), ByteString(SecondRecordData))

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec
The source code for this page can be found here.