Functional Scala Caching

雎鳩ju jiu

Functional Scala Caching

https://circleci.com/gh/jcouyang/jujiu.svg?style=svg https://www.javadoc.io/badge/us.oyanglul/jujiu_2.12.svg?label=document Sorry, your browser does not support SVG. Sorry, your browser does not support SVG.

关关雎鸠 在河之洲 Do one thing and do it well micro birds library series

libraryDependencies += "us.oyanglul" %% "jujiu" % "0.2.1"

Making Caffeine cats-badge-tiny.png

 1: "it should able to get and set cache" >> {
 2:   object cacheDsl extends CaffeineCache[IO, String, String]   // <-
 3:   val program = for {
 4:     r1 <- cacheDsl.fetch("not exist yet")                     // <-
 5:     r2 <- cacheDsl.fetch("not exist yet", _ => IO("default")) // <-
 6:     _ <- cacheDsl.put("not exist yet", "now exist")           // <-
 7:     r3 <- cacheDsl.fetch("not exist yet")
 8:     _ <- cacheDsl.clear("not exist yet")
 9:     r4 <- cacheDsl.fetch("not exist yet")
10:   } yield (r1, r2, r3, r4)
11:   program(Caffeine().sync)                                    // <-
12:     .unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
13: }

This README is a literal programming file, all code here will generate the test file

I can walk you through line by line though

  • line-2 creates an instance of CaffeineCache which has side effect IO, key is String and value is String as well
  • line-4 won't acutally trigger any effect, it just returns a DSL, represent as type Klesili[IO, Cache, String] which in English, "give me a Cache and I can provide you an IO[String]"
  • line-5 is new fetch DSL, the second parameter is a function K => IO[V], if cache not exist, it will run the function can put the result into the cache, and return the value
  • line-6 will update the value of key "not exist yet" to "overrided"
  • line-11 is the Scala idiomatic syntax to build synchronize Caffeine Cache
    if you still recall that the program is actually Klesili[IO, Cache, String] so now
    I provide it a Cache by program(Caffeine().sync)
    it shall return me a IO[String] .unsafeRunSync() the IO and all effects you described before in program will be triggered
    and you will get the actual result

and Cats IO

Jujiu has very flexible DSL, If you don't like Kleisli, it works with IO(technically you IO type just need to be a Async) as well

what you'll need to import some syntax and provide cacheProvider implicitly, since you are not using Kleisli, you need to tell what cache these DSLs will run on

  "loading cache" >> {
    val c: LoadingCache[IO, cache.LoadingCache, String, String] = new CaffeineLoadingCache[IO, String, String] {}
    implicit val cacheProvider: cache.LoadingCache[String, String] = Caffeine().sync(identity)
    def program =
      for {
        _ <- IO(println("something"))
        r1 <- c.fetchF("1")
        r2 <- c.fetchAllF(List("2", "3"))
        r3 <- c.parFetchAllF[List, IO.Par](List("4", "5"))
      } yield (r1, r2, r3)
    program.unsafeRunSync() must_== (("1", List("2", "3"), List("4", "5")))

similar to ExecutionContext, you need to provide context the thread can run on

and all dsl suffix with F

and Caffeine builder

Dealing with Java DSL and Java Future is too verbose and painful in Scala project

Let's see how Jiujiu makes Caffeine friendly to Cats IO as well

A good example is the Async Loading Cache

First you will need caffeine builder syntax

import us.oyanglul.jujiu.syntax.caffeine._
"it should able to get and set async loading cache" >> {
  object cache extends CaffeineAsyncLoadingCache[IO, Integer, String] {
    implicit val executionContext = global          // <-- (executionContext)

  val program = for {
    r1 <- cache.fetch(1)
    r2 <- cache.fetch(2)
    r3 <- cache.fetchAll(List[Integer](1, 2, 3))
  } yield (r1, r2, r3)

  val caffeineA = Caffeine()
    .executionContext(global)           // <-- (global)
    .expire(                            // <-- (expire)
      (_: Integer, _: String) => {
      (_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration,
      (_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration
    .async((key: Integer) => IO("async string" + key)) // <-- (async)

  val caffeineB = Caffeine()
    .async((key: Integer) => IO("async string" + key))

  val expected = (
    "async string1",
    "async string2",
    List("async string1", "async string2", "async string3")
  program(caffeineA).unsafeRunSync() must_== expected
  program(caffeineB).unsafeRunSync() must_== expected
  program(Caffeine().async(_ => IO.raiseError(new Exception("something wrong"))))
    .unsafeRunSync() must throwA[Exception]
  • line-executionContext Async Loading Cache need an Execution Context to execute the Java Future things
  • line-global .executionContext(global) will make sure the cache using Scala execution context as default to execute java future, otherwise its default java folk join pool. alternatively you can also use Akka's execution context.
  • line-expire default the expiring policy, here it's more Scala way using lambda and Duration
  • line-async will create an async loading cache. the async loading function that it will use is K => IO[V] so you don't need to deal with awful Java Future.

Works with Tagless Final

No matter what style of effect abstraction you project is using, Jujiu can easily fit in

i.e. Tagless Final

"works with tagless final" >> {
  import us.oyanglul.jujiu.syntax.cache._
  trait LogDsl[F[_]] {
    def log(msg: String): F[Unit]

  type ProgramDsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]

  def program[F[_]: Async](dsl: ProgramDsl[F])
  (implicit ev: cache.Cache[String, String]): F[Option[String]] =
    for {
      value <- dsl.fetchF("key")
      _ <- dsl.log("something")
    } yield value

    object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO] {
      def log(msg: String) = IO(org.log4s.getLogger.info(msg))

    implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]

    program[IO](dsl).unsafeRunSync() must_== None

just extends CaffeineCache[F, K, V] and provide cacheProvider

ReaderT Pattern

if your code is in ReaderT pattern, good, it will fit in more naturally

"works with tagless final style readerT" >> {
  // Layer 1: Environment
  trait HasLogger {
    def logger: org.log4s.Logger
  trait HasCacheProvider {
    def cacheProvider: cache.Cache[String, String]

  type Env = HasLogger with HasCacheProvider

  // Layer 2: DSL
  trait LogDsl[F[_]] {
    def log(msg: String)(implicit M: Applicative[F]): Kleisli[F, Env, Unit] = Kleisli(a => M.pure(a.logger.info(msg)))

  type Dsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]

  // Layer 3: Business
  def program[F[_]](dsl: Dsl[F])(
    implicit ev: Async[F]
  ) =
    for {
      _ <- dsl.log("something")
      value <- dsl.fetch("key").local[Env](_.cacheProvider)
    } yield value

  object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO]

    .run(new HasLogger with HasCacheProvider {
      def logger = org.log4s.getLogger
      def cacheProvider = Caffeine().sync
    .unsafeRunSync() must_== None

notice that proper contravariant adapt need .local[Env](_.cacheProvider)


it's extensible by design as Kleisli, if you provider another cache provider, the same dsl will work.

"run on redis" >> {
  import redis.clients.jedis._

  def program[F[_]: Async, S[_, _]](dsl: Cache[F, S, String, String]) = for {
    r1 <- dsl.fetch("not exist yet")
    r2 <- dsl.fetch("not exist yet", _ => Async[F].delay("default"))
    _ <- dsl.put("not exist yet", "now exist")
    r3 <- dsl.fetch("not exist yet")
    _ <- dsl.clear("not exist yet")
    r4 <- dsl.fetch("not exist yet")
  } yield (r1, r2, r3, r4)

  type J[A, B] = Jedis
  object dsl extends Cache[IO, J, String, String] {
    def put(k: String, v: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
      Kleisli { redis =>
          redis.set(k, v)
    def fetch(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Option[String]] =
      Kleisli(redis => M.delay(Option(redis.get(k))))
    def clear(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
      Kleisli(redis => M.delay{

     new Jedis("localhost")
  ).unsafeRunSync() must_== ((None, "default", Some("now exist"), None))