HOME | EDIT | RSS | INDEX | ABOUT | GITHUB

Into ReaderT-verse

To keep this article reletively small, lets scope our problem smaller

The Program

I'll start with a simple deja vu program.

val sourceBucketName = "source-bucket"
val fileName = "fileA"
val targetBucketName = "target-bucket"
val awsClient = AmazonS3ClientBuilder.standard().build()
def pureBusinessProcess(content: String): String = ???
def program = {
  val content = awsClient.getObject(sourceBucketName, fileName)
  val result = pureBusinessProcess(content)
  awsClient.putObject(targetBucketName, fileName)
}

It's pretty simple program:

  1. fetch a file from s3 bucket A
  2. process the content according to what business you have
  3. put the result into a file in bucket B

The Problem

Such program looks pretty clean and readable but implementation like this is never good enough for production because it lack of:

  1. Error handling
  2. Testability
  3. Logging

If we add those 3 factors, the program will not clean and readable anymore

val awsClient = AmazonS3ClientBuilder.standard().build()
def pureBusinessProcess(content: String): String = ???
val logger = org.log4s.getLogger
def program(client: AmazonS3) = {
  val content = try {
    val content = try {
      awsClient.getObject(sourceBucketName, fileName)
    } catch {
      case e: Throwable =>
        logger.error("error fetching file from S3")
        throw e
    }
    val result = pureBusinessProcess(content)
    try {
      awsClient.putObject(targetBucketName, fileName)
    } catch {
      case e: Throwable =>
        logger.error("error puting file to S3")
        throw e
    }
  } catch { 
    case e: Throwable => logger.error(s"error processing $fileName from $sourceBucketName: ${e.getMessage}")
  }
}

So, we factor the effect client: AmazonS3 as parameter to achieve little improvement of testability, however, this is an absolutly crap, the core process of the program is totally lost in the ocean of error handling and logging.

A better version with ReaderT

With ReaderT, we will get a better version of:

  1. DI Dependency Injection
  2. Error Handling thanks to MonadError

Instead of parameterize client, we change the return type of program to ReaderT[IO, AmazonS3, Unit]

def program: ReaderT[IO, AmazonS3, Unit] = for {
  content <- Kleisli(client => IO(client.getObject(sourceBucketName, fileName)))
    .onError{case e:Throwable => IO(logger.error("error fetching file from S3: ${e.getMessage}"))}
  result = pureBusinessProcess(content)
  _ <- Kleisli(client => IO(client.putObject(targetBucketName, fileName)))
    .onError{case e: Throwable => IO(logger.error("error puting file to S3"))}
} yield ()

Ok it's little bit better though I don't think it's very readable.

Now it's the best feature of functional programming, ReaderT is pure so you can put that "not so readable thing" anywhere and give it a reasonable name.

val fetchContent = Kleisli(client => IO(client.getObject(sourceBucketName, fileName)))
    .onError{case e:Throwable => IO(logger.error("error fetching file from S3: ${e.getMessage}"))}

def putContent(content: String) = Kleisli(client => IO(client.putObject(targetBucketName, fileName, content)))
    .onError{case e: Throwable => IO(logger.error("error puting file to S3: ${e.getMessage}"))}

def program: ReaderT[IO, AmazonS3, PutObjectResult] = for {
  content <- fetchContent
  result = pureBusinessProcess(content)
  _ <- putContent(result)
} yield ()

Fine, that's better at readability, but testing journey won't be much difference, I still need to mock the client and stub getObject and putObject.

val fakeClient = mock[AmazonS3]
val res = mock[PutObjectResult]
fakeClient.getObject(sourceBucketName, fileName) returns "some content"
fakeClient.putObject(targetBucketName, fileName, "processed") returns res
program.run(fakeClient).unsafeRunSync must_== res

Not bad but not good either, we can avoid mocking and stubing by abstract another layer, just like 3 Layer Scala Cake

Layer 2

trait Interpreter[F[_]] {
    def getObject(bucketName: String, fileName: String): F[String]
    def putObject(bucketName: String, fileName: String, content: String): F[PutObjectResult]
}

Layer 3

def program: ReaderT[IO, Interpreter[IO], PutObjectResult] = for {
  content <- Kleisli(_.getObject(sourceBucketName, fileName))
  result = pureBusinessProcess(content)
  _ <- Kleisli(_.putObject(targetBucketName, fileName, result))
} yield ()

A Kleisli before _.getObject... barely sacrifice our readability but the benifit we get is that we can swap Layer 2 with abitrary interpreter in test

val res = mock[PutObjectResult]
program.run(new Interpreter[IO] {
  def getObject(bucketName: String, fileName: String) = IO("some content")
  def putObject(bucketName: String, fileName: String, content: String) = res
}).unsafeRunSync() must_== res

ReaderT-verse

Finally we just need to refactor a bit and it's production ready:

  1. Readable
  2. Testable by swaping out Layer 2
  3. Better Error Handling thanks to MonadError
  4. Logging
  5. Extensible Same as Tagless Final
  6. Composable Same as Tagless Final
  def program: ReaderT[IO, Interpreter[IO], PutObjectResult] = for {
    env <- Kleisli(_.getEnv)
    content <- Kleisli(_.getObject(env.source, env.fileName))
    .onError{case e:Throwable => IO(logger.error(s"error fetching file from S3: ${e.getMessage}"))}
    result = pureBusinessProcess(content)
    resp <- Kleisli(_.putObject(env.target, env.fileName, result))
    .onError{case e: Throwable => IO(logger.error(s"error puting file to S3: ${e.getMessage}"))}
  } yield resp

  case class Env(source: String, target: String, fileName: String)
  val interpreter = new Interpreter[IO] {
    lazy val awsClient = AmazonS3ClientBuilder.standard().build()
    def getEnv = IO(Env("sourceBucket", "targetBucket", "fileA"))
    def getObject(bucketName: String, fileName: String) =
      IO(awsClient.getObject(bucketName, fileName))
    def putObject(bucketName: String, fileName: String, content: String) =
      IO(awsClient.putObject(bucketName, fileName, content))
  }

program.run(interpreter).unsafeRunSync()

Footnotes:

1

Dependency Injection

2

thanks to MonadError

3

by swaping out Layer 2

4

Same as Tagless Final