LendUp, Engineering blog published this post on ‘A Functional Interface for Key/Value Store’ which we thought we would share with you. Happy reading!
At LendUp, we have the need to run multiple pipelines that transfer batched data across different systems via SFTP and Amazon S3 and, very commonly, storing temporary data transformations in local filesystems; all these storage mechanisms behave very similar and can be modelled by a common interface.
Initial implementations using specific client libraries for each storage mechanism gradually gravitated towards bad software development practices that made code hard to test (i.e. required S3/SFTP credentials) and reduced performance and scalability limited by host memory or filesystem.
By providing a generic Store implementation, abstracted from specific storage mechanisms, we are able to interchange specific implementations depending on the context calling the functions. This makes it very easy to provide mocks in unit tests to replace SFTP/S3 stores with a filesystem store in a controlled environment.
fs2-blobstore is a minimal, idiomatic, stream-based Scala interface for key/value store implementations with the assumption that any key/value store must provide these 6 basic functions:
- List keys
- Get key’s value
- Put value in key
- Copy key value to some other key
- Move key value to some other key
- Remove key
fs2-blobstore provides scalability by relying on fs2 streams resource safety and concurrency primitives while providing a flexible abstraction that applies to different storage mechanisms.
Stream-based Store
One of the main goals for fs2-blobstore is to provide scalable code that would allow us to grow our business from thousands to millions of users. With users growth, our data pipeline will need to be able to process larger and larger files.
The first way to provide scale in a data pipeline is to avoid loading incoming data files in memory, off course, but also avoid writing to disk as much as possible, especially for intermediate steps.
The initial pipeline implementation would make heavy use of temporary local disk storage before uploading files to the permanent S3 location for archiving. While yes, disks are cheap these days, this approach of using local storage as a staging area for all files transferred through the pipeline would eventually limit the ability to process files concurrently, as this temporary storage would get filled with intermediate transformations to the data and increases processing time as it reads and write data to filesystem.
In a stream-based store interface built with Functional Streams for Scala (fs2) we can transfer the stream of bytes coming from SFTP into the stream of bytes going into S3 without ever storing these bytes in the local filesystem. There are times when it is required to use local filesystem for intermediate storage, but, by avoiding unnecessary writes we allow for larger scale overall.
Modelling a Key/Value Store
As mentioned above, fs2-blobstore models a very specific type of store from Path to Bytes, and guarantees a very strong contract and expectation when providing store implementations.
The key is modeled as:
case class Path(
root: String,
key: String,
size: Option[Long],
isDir: Boolean,
lastModified: Option[Date]
)
Functions in the Store interface that receive a Path (get, put, copy, move, remove) assume that only root and key values are mandatory, there is no guarantee that the other attributes of Path would be accurate: size, isDir, lastModified. On the other hand, when a Store implements the list function, it is expected that all 3 fields will be present in the response.
Given that the key is modelled as Path and value is modelled as bytes, we can define the Store interface with the six basic functions listed above like this:
trait Store[F[_]] {
def list(path: Path): fs2.Stream[F, Path]
def get(path: Path, chunkSize: Int): fs2.Stream[F, Byte]
def put(path: Path): fs2.Sink[F, Byte]
def move(src: Path, dst: Path): F[Unit]
def copy(src: Path, dst: Path): F[Unit]
def remove(path: Path): F[Unit]
}
These six functions can be composed to provide any functionality required when manipulating data across stores. Note that the get function requires the “chunk size” in the basic interface but by importing Store syntax you can use functions like these that default to 4k buffer size:
// get Stream with default 4k chunk size
def get(path: Path): Stream[F, Byte]
// consume contents stream and return as UTF-8 String
def getContents(path: Path)(implicit F: Effect[F]): F[String]
It is also highly recommended to provide the optional path size when calling Store.put function because of Amazon’s S3 upload requirements; S3Store.put will use the provided path size when starting the data upload to S3. If no size is provided, the Amazon S3 client will warn that all data will be loaded into memory first, compute the size, and then start transferring data to S3 which would cause out of memory exception when attempting to transfer large amounts of data.
Composing Streams of Bytes
Now that we have defined the Store interface we can get to more complex functions to demonstrate the capabilities of the composable Streams. Going back to LendUp’s data pipelines, a very common use case is to transfer files across stores, regardless of the Store implementation, transferring a file can be achieved with:
def transferFile[F[_]](
srcStore: Store[F],
dstStore: Store[F],
path: Path): fs2.Stream[F, Unit] = {
srcStore.list(path).filter(!_.isDir).evalMap { p =>
val contents: fs2.Stream[F, Byte] = store.get(p, 4096)
val sink: fs2.Sink[F, Byte] = dstStore.put(p)
val transfer: fs2.Stream[F, Unit] = contents to sink
transfer.compile.drain
}
}
Notice that this transfer implementation does not assume that provided path has a the correct size of the file to transfer. Instead it makes the best effort to compute the size by calling list before starting data transfer. This implementation also guarantees that data is not stored in memory or local file system at any point, not even if the destination store bandwidth is much slower than the source store bandwidth. Data will be pulled from the source store socket only as fast as we can write into the destination store. This is because fs2 is naturally “pull based”, and will only demand bytes from a source when they are needed. This eliminates the need for explicit back-pressure, as would be required from a “push-based” implementation.
Transferring data across stores is such a common use case that it is already provided by importing Store syntax and it is a nicer version that allows to transfer nested directories recursively:
import blobstore.implicits._
srcStore.transferTo(dstStore, srcPath, dstPath)
Next Steps
There are some features to complete that while not difficult to implement we just have not had the time to do so. It would be very simple to implement an in-memory store that would be even better mock Store than a filesystem based Store. Also features like listing recursively would be easy to implement in the three current implementations of Store.
Other features are not so straight forward, allowing stores to implement specific behaviours related to their underlying system are not trivial, especially when putting content as each storage mechanism provides custom features that may not be available for all, for example, S3 allows to write data with server side encryption, different ACLs, among many options, while a filesystem allows to append data to an existing file; these specific features may prove difficult to implement while preserving the abstraction.
If this looks like something you could use in your projects, head over to the git repo for instructions and samples and join the conversation in the gitter channel.
Article written by Stew O'Connor For LendUp Credit, found on the LendUp Engineering Blog.