Awesome
Pekko Streams Circe Support
This library provides Json support for stream based applications using jawn as a parser. It supports all backends that jawn supports with support for circe provided as an example.
It is a fork of akka-streams-json built with Apache Pekko. akka-streams-json which is in itself a fork of https://github.com/knutwalker/akka-stream-json.
Installation
There are two main modules, pekko-stream-json
and pekko-http-json
.
pekko-stream-json
is the basis and provides the stream-based parser while
pekko-http-json
enabled support to use the desired json library as an Unmarshaller.
libraryDependencies ++= List(
"org.mdedetrich" %% "pekko-stream-json" % <VERSION>,
"org.mdedetrich" %% "pekko-http-json" % <VERSION>"
)
pekko-stream-circe
is published for Scala 2.12, 2.13 and 3.3+.
Usage
The parser lives at org.mdedetrich.pekko.json.stream.JsonStreamParser
Use one of the constructor methods in the companion object to create the parser at various levels of abstraction, either a Stage, a Flow, or a Sink. You just add the jawn support facade of your choice, and you will can parsed into their respective Json AST.
For Http support, either import org.mdedetrich.pekko.http.JsonSupport._
or mixin ... with org.mdedetrich.pekko.http.JsonSupport
.
Given an implicit jawn facade, this enables you to decode into the respective Json AST using the Pekko HTTP marshalling framework. As jawn is only about parsing and does not abstract over rendering, you'll only get an Unmarshaller.
Circe
libraryDependencies ++= List(
"org.mdedetrich" %% "pekko-stream-circe" % "<VERSION>",
"org.mdedetrich" %% "pekko-http-circe" % "<VERSION>"
)
(Using circe 0.14.x)
Adding support for a specific framework is quite easy.
These support modules allow you to directly marshall from/unmarshall into your data types
using circe's Decoder
and Encoder
type classes.
Just mixin or import org.mdedetrich.pekko.http.support.CirceHttpSupport
for Http
or pipe your Source[ByteString, _].via(org.mdedetrich.pekko.stream.CirceStreamSupport.decode[A])
to get a Source[A, _]
.
This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.
If there is an error in parsing the Json you can catch org.mdedetrich.pekko.http.support.CirceStreamSupport.JsonParsingException
.
The exception provides Circe cursor history, current cursor and the type hint of the error.
Why jawn?
Jawn provides a nice interface for asynchronous parsing. Most other Json marshalling provider will consume the complete entity at first, convert it to a string and then start to parse. With jawn, the json is incrementally parsed with each arriving data chunk, using directly the underlying ByteBuffers without conversion.
License
This code is open source software licensed under the Apache 2.0 License.