Connecting...

Pexels Photo 373543

Correlation IDs in Scala using Monix by Adam Warski

Pexels Photo 373543

Do you often use Monix when using Correlation IDs? These identifiers are unique and can span numerous requests, events or message and are useful when developing microservice-based systems. Adam Warski, CTO at SoftwareMill goes into extensive detail on how these can be implemented and more. Read more and become a Correlation ID expert!

 

'Correlation IDs are unique identifiers attached to a single “business transaction”, which can span a number of requests, events or messages. Such identifiers are most useful when developing microservice-based systems, to trace the request flow graph. But the use-cases are not limited to development: distributed tracing might be useful in production as well. Systems such as Zipkin allow to collect, visualise and manage the gathered traces.

In synchronous frameworks, correlation IDs are typically implemented by setting a 'ThreadLocal' value while the request, message or event is being processed. This value can then be read by loggers, messaging and HTTP libraries. However, implementing support for correlation IDs in an asynchronous setup (e.g. when using 'Future's), has always been challenging — how to reliably pass the correlation ID across thread boundaries?

One possibility is of course to add an additional parameter to each method, log statement etc., however I think we will all agree that such an approach is not very practical. In Scala this can be done using an implicit parameter but still, all methods involved need to be correlation-ID aware. Is there any hope?

Constructs known from functional programming might bring a solution, specifically separating the description of effectful operations from their evaluation. Unlike 'Future's, which are evaluated eagerly upon construction, wrapper types such as 'IO's and 'Task's evaluate lazily, on demand, driven by an evaluator.

For some motivation on why to use wrappers in the first place, and more background on the 'Task' execution model, see the “Why wrestle with wrappers?” blog. For a comparison between 'Future' & actor-based computations and Monix/ZIO-based ones, see this blog series.

A 'Task' is only a description of a computation. Upon construction, nothing happens, and no “business logic” is executed — only the information on the structure of the computation (e.g. what to run sequentially, and what to run in parallel) is created, as a data structure. This description can be then evaluated step-by-step into a running computation — a 'Future' — using an interpreter/evaluator.

Such an evaluator might not only take care of running the computation, but also propagate the correlation IDs.

And that’s the case for Monix, which includes 'Local', a construct allowing to propagate a value when evaluating 'Task's, across 'Task' (and thread) boundaries. Quoting the scaladoc:

A 'Local' is a 'ThreadLocal' whose scope is flexible. The state of all Locals may be saved or restored onto the current thread by the user. This is useful for threading Locals through execution contexts.

Local context propagation is disabled by default. It can be enabled by setting an option on a 'Task' instance:

.executeWithOptions(_.localContextPropagation)

or globally using a JVM option:

-Dmonix.environment.localContextPropagation=1

How to use this mechanism to implement support for correlation IDs, and more importantly, does it work? (spoiler: yes, at least in the PoC)

1 client & 3 servers propagating requests

The monix-correlation-id repository contains a PoC of implementing correlation ID support using Monix. It contains three simple HTTP servers, which call each other in a chain, log some information as well as read data from a database. That way we can verify if the correlation ID is properly passed in a variety of situations.

The following libraries are used:

The first three work with any datatype for which there’s an implementation of the cats-effect typeclasses. Monix’s 'Task' is one of them.

Warning: the code below includes global state and unwrapped, side-effecting method calls. Proceed with caution!

As we said earlier, we’ll be using a 'Local' instance. It will store an 'Option[String]' with the current correlation ID(if any):

object CorrelationId extends StrictLogging {
  private val localCid = new Local[Option[String]](() => {
    None
  })

  def apply(): Option[String] = localCid()
}

When receiving an http request (via http4s), we try to read the correlation ID from a 'X-Correlation-ID' header, or if it’s not set, create a new one — a random string. This can be implemented with a http4s middleware:

object CorrelationId {
  val CorrelationIdHeader = "X-Correlation-ID"
  
  def setCorrelationIdMiddleware(service: HttpRoutes[Task]): HttpRoutes[Task] = 
    Kleisli { req: Request[Task] =>
      val cid = req.headers.get(CaseInsensitiveString(CorrelationIdHeader)) match {
        case None => newCorrelationId()
        case Some(cidHeader) => cidHeader.value
      }

      localCid.update(Some(cid))
      logger.info("Starting request")
      service(req)
    }
}

Services that want to use correlation IDs must be wrapped with the middleware, for example as follows:

 

object Server1 extends App with StrictLogging {
  val dsl = Http4sDsl[Task]
  import dsl._

  val service = HttpRoutes.of[Task] {
    case GET -> Root / "test1" => Ok()
  }

  BlazeServerBuilder[Task]
    .bindHttp(8081)
    .withHttpApp(CorrelationId.setCorrelationIdMiddleware(service).orNotFound)
    .serve
    .compile
    .drain
    .runSyncUnsafe()
}

This takes care of the request-receiving side (the server). What about request sending (the client)? As we mentioned earlier, we’ll be using sttp to send HTTP requests. Here, we need to set the correlation ID header on outgoing requests. To do this, we create a wrapper for an sttp backend:

class SetCorrelationIdBackend(delegate: SttpBackend[Task, Nothing]) 
  extends SttpBackend[Task, Nothing] {
    
  override def send[T](request: sttp.Request[T, Nothing]): Task[Response[T]] = {
    // suspending the calculation of the correlation id until the request 
    // send is evaluated
    Task {
      CorrelationId() match {
        case Some(cid) => request.header(CorrelationIdHeader, cid)
        case None => request
      }
    }.flatMap(delegate.send)
  }

  override def close(): Unit = delegate.close()

  override def responseMonad: MonadError[Task] = delegate.responseMonad
}
Having such a wrapped implicit backend in scope, when an HTTP request is sent, it will include the current correlation ID. This is as simple as invoking e.g. 'sttp.get(uri"http://localhost:8082/test2").send()', which yields a 'Task[Response[String]]'.
Finally, what about logging? The “official” way to pass additional data to be used in log statements is using MDC, however this mechanism relies on thread locals, so isn’t usable in our case. I couldn’t find another way to intercept log events, so we’ll abuse logback’s 'Filter' and set the thread-local 'MDC' variable there. This, in turn, will make them automatically available in logging patterns:
class SetCorrelationIdInMDCFilter extends Filter[ILoggingEvent] {
  override def decide(event: ILoggingEvent): FilterReply = {
    CorrelationId().foreach(MDC.put("cid", _)) 
    FilterReply.NEUTRAL
  }
}

The 'cid' field can be then used in the log message pattern, if it is set:

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  <encoder>
    <pattern>%d{HH:mm:ss.SSS} %highlight(%replace([%X{cid}] ){'\[\] ', ''})[%thread] %-5level %logger{5} - %msg%n</pattern>
  </encoder>

  <filter class="com.softwaremill.monix.SetCorrelationIdInMDCFilter" />
</appender>

And that’s it. Given that you start the servers from the PoC repository using the:

-Dmonix.environment.localContextPropagation=1

option, the correlation ID will be automatically propagated across 'Task's, both in synchronous and asynchronous operations — hence also involving thread boundaries.

Server1 in action. Note that a single request is served by multiple threads.

To verify this for yourself, you can start the 'Server1', 'Server2' and 'Server3' applications (which bind to ports '8081', '8082' and '8083', respectively — but be sure to add the context propagation option!), and then invoke a single request to 'Server1' by running the 'Client' class. You should see log messages with the (same) correlation ID highlighted in each server; the response to the request will consist of correlation IDs as seen at various stages of evaluating the call chain: propagating http calls, running parallel operations (using 'Task.gatherUnordered') and running database queries. Making another request will use another correlation ID. 

Moreover, you can run a test which issues a number of parallel requests using the 'StressClient' class. The result of each call will be verified to check that it contains only the appropriate correlation IDs.

Finally, the 'SingleServer' class contains a more contained setup, which starts a single server and makes a single http request.

 

Notes/TODOs

The code is of PoC-quality, and as such has some improvement possibilities, notably:

  • in 'SetCorrelationIdInMDCFilter', the thread-local MDC variable is set but never cleared, as the filter is only run before a message is logged. Probably a different logback mechanism would have to be used to do this properly.
  • similarly, in 'setCorrelationIdMiddleware', the correlation ID is set in the 'Local', but never cleared
  • only enabling local context propagation globally, using the JVM option works. Evaluating the main task with the modified options (setting them just before 'runSyncUnsafe') seems to have no effect. My guess is that http4s also evaluates 'Task's to 'Future's somewhere internally, this time with the default options.
  • finally, the demo is using http4s version 0.20.0-M3, which is obviously not production-ready. A stable verson can’t be used, as Monix 3.0.0-RC2, which includes a number of fixes for 'Local's, depends on a newer version of cats-effect, which is not compatible with the one used by http4s 0.18.
 

Summary

Support for correlation IDs is one more area where separating program description from evaluation can be beneficial. The description of the program, represented as a 'Task', knows nothing about how and in what context it will be evaluated: and rightfully so, as it doesn’t need this kind of information.

It’s the evaluator — which interprets the lazy 'Task' into a running 'Future' — which tracks the correlation IDs and propagates them correctly across task and thread boundaries, one 'flatMap' at a time.'

 

This article was written by Adam Warski and originally posted on blog.softwaremill.com