I recently implemented an akka-stream flow which parse some json messages, validate the presence of a given key (destination_region
) and pass to the next stage a case class containing the original message and the destination_region
I implemented a custom decider so that in case it face any parsing or key error, it will trigger Supervision.Resume
after logging the exception.
A minimalistic implementation would look like:
package com.example.stages
import com.example.helpers.EitherHelpers._
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object EitherHelpers {
implicit class ErrorEither[L <: Throwable, R](val either: Either[L, R]) extends AnyVal {
def asFuture: Future[R] = either.fold(Future.failed, Future.successful)
def asTry: Try[R] = either.fold(Failure.apply, Success.apply)
import scala.concurrent.ExecutionContext
import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision
import software.amazon.awssdk.services.sqs.model.Message
import io.circe.parser.parse
import io.circe.{DecodingFailure, ParsingFailure}
object MessageContentValidationFlow {
def apply()(
implicit executionContext: ExecutionContext): Flow[Message, MessageWithRegion, NotUsed] = {
val customDecider: Supervision.Decider = {
case e @ (_: DecodingFailure | _: ParsingFailure) => {
case _ => Supervision.Stop
.mapAsync[MessageWithRegion](2) { message =>
println(s"Received message: $message")
val messageWithRegion = for {
parsed <- parse(message.body()).asFuture
region <- parsed.hcursor.downField("destination_region").as[String].asFuture
} yield { MessageWithRegion(message, region) }
case class MessageWithRegion(message: Message, region: String)
I managed to test the case where the message is valid, however I have not clue about how to test the flow in case of ParsingFailure
or DecodingFailure
. I have tried almost all methods available for sub
in the implementation below:
package com.example.stages
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import io.circe.generic.JsonCodec, io.circe.syntax._
import io.circe.generic.auto._
import software.amazon.awssdk.services.sqs.model.Message
import org.scalatest.FlatSpec
@JsonCodec case class MessageBody(destination_region: String)
class MessageContentValidationFlowSpecs extends FlatSpec {
implicit val system = ActorSystem("MessageContentValidationFlow")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (pub, sub) = TestSource.probe[Message]
"MessageContentValidationFlow" should "process valid messages" in {
val validRegion = "eu-west-1"
val msgBody = MessageBody(validRegion).asJson.toString()
val validMessage = Message.builder().body(msgBody).build()
sub.request(n = 1)
val expectedMessageWithRegion = MessageWithRegion(
message = validMessage,
region = validRegion
assert(sub.requestNext() == expectedMessageWithRegion)
ignore should "trigger Supervision.Resume with empty messages" in {
val emptyMessage = Message.builder().body("").build()
assert(emptyMessage.body() == "")
sub.request(n = 1)
Does anyone know how to test that Supervision.Resume
was triggered and which exception was caught by the custom decider?
