Scala, MongoDB and Cats-Effect

Boris The Astronaut
6 min readMay 29, 2021

MongoDB is an open source database that uses a document-oriented data model and a non-structured query language. It is one of the most powerful NoSQL databases around today. In comparison to tradition SQL databases, MongoDB does not use the usual rows and columns to model its data; instead it uses a BSON(Binary JSON) format to save the data (documents) in the collections, where the basic unit of data consists of a set of key-value pairs.

For Scala, there are several MongoDB drivers available, the most popular of which are the official MongoDB Scala Driver and ReactiveMongo.

However, in this blog post, I am going to introduce a more recent MongoDB client — mongo4cats, which represents a wrapper around native MongoDB Java client compatible with Cats-Effect (3.x) and FS2 (3.x) libraries. By providing integration with Cats-Effect, MongoDB can now support fully non-blocking and asynchronous I/O operations executed in a pure functional way.

About Cats-Effect

Cats-Effect is a high-performance, asynchronous, composable framework for building real-world applications in a purely functional style. As a library, Cats Effect provides an IO monad which can be used for capturing, controlling and composing effects (such as making a connection to a database or executing a query), and allows performing them within a resource-safe, typed context. More on it here.

Dependencies

To start, we will need to add the required dependencies to our build.sbt file:

libraryDependencies ++= Seq(
"io.github.kirill5k" %% "mongo4cats-core" % "0.4.5",
"io.github.kirill5k" %% "mongo4cats-circe" % "0.4.5"
)

mongo4cats-core brings the core functionality needed to make connections to the database and for executing queries, whereas mongo4cats-circe adds an additional syntax for decoding our entities with Circe codecs (more on it later).

Connecting to the database

To connect to our database, we will need to create an instance of MongoClient[F] first. The MongoClient[F] represents a pool of connections for a given MongoDB server deployment and typically only one instance of this class is required per application (even with multiple operations executed concurrently). The easiest way of creating a client is by calling a fromConnectionString method:

import mongo4cats.client._val mongoClient: Resource[IO, MongoClient[IO]] =       MongoClientF.fromConnectionString[IO]("mongodb://localhost:27017")

When creating a client using any of MongoClient constructor methods, we get a Resource[IO, MongoClient[IO]]which ensures that the connection is closed after its use.

Once we have our client, we can then start using it for making connections to our database:

mongoClient.use { client =>
for {
db <- client.getDatabase("testdb")
} yield ()
}

Data modelling

Internally, MongoDB stores all of its data in a BSON format, which is a close cousin of a traditional JSON that we all got used to. Similarities between these two formats allow us to derive MongoDB codecs with tools that are normally used for doing transformations of case classes into a plain JSON. One of such tools is Circe.

To begin with, let’s create a simple case class to model data in our collection:

sealed trait PaymentMethodfinal case class CreditCard(
name: String,
number: String,
expiry: String,
cvv: Int
) extends PaymentMethod
final case class Paypal(email: String) extends PaymentMethodfinal case class Payment(
id: ObjectId,
amount: BigDecimal,
method: PaymentMethod,
date: Instant
)

Once our model is defined, we can get our collection from the database:

import io.circe.generic.auto._
import mongo4cats.circe._
for {
db <- client.getDatabase("testdb")
coll <- db.getCollectionWithCodec[Payment]("payments")
}
yield ()

Calling MongoDatabase[F] methodgetCollectionWithCodec[T] requires to have an instance of MongoCodecProvider[T] available in the implicit scope, which will then be used by the collection internally for obtaining codecs for encoding and decoding our entities into BSON documents. Luckily, since we have this import included:

import mongo4cats.circe._

the codec provider will be derived automatically with the help of Encoder[T] and Decoder[T] instances brought in by including Circe’s automatic derivation import:

import io.circe.generic.auto._

Now we have everything ready to start working with the data!

Inserting documents

To insert an object into database, simply call insertOne to insert a single document or insertMany to insert a sequence of documents:

val creditCard = CreditCard("John Bloggs", "1111222233334444", "1021", 919)
val paypal = Paypal(
"john.bloggs@test.com")
val payment1 = Payment(
ObjectId(), BigDecimal(2.5), paypal, Instant.parse("2021-04-05T12:00:00Z"))
val payment2 = Payment(ObjectId(), BigDecimal(9.99), creditCard, Instant.parse("2021-04-12T12:00:00Z"))
for {
...
_ <- coll.insertOne(payment1)
_ <- coll.insertMany(List(payment2))
}
yield ()

Querying documents

The most straightforward way to query a collection, would be to use its find method:

import mongo4cats.collection.operations.Filter
...
val afterDateFilter = Filter.gte("date", Instant.parse("2021-04-01T00:00:00Z"))
val beforeDateFilter =
Filter.lt("date", Instant.parse("2021-05-01T00:00:00Z"))
for {
...
payments <- coll.find
.filter(afterDateFilter && beforeDateFilter)
.sortByDesc("amount")
.limit(5)
.all
}
yield ()

As can be noted from the example, calling find returns a query builder, using which you have an option of adding additional filters, sorts, projections and limits. The query can be executed by either calling:

  • first — returns an option containing only first document that matches a provided query
  • all — returns a sequence containing all found documents
  • stream — returns a stream in which documents are emitted as soon as they are received from the database

When building a filter using the Filter class from mongo4cats.collection.operations package, there is a vast variety of additional query predicates and projections available for removing documents that you know are not relevant to incoming queries. Moreover, these filters can be chained together using logical operator && (and) and || (or).

Updating documents

Now, let’s see how we can find a document and update some of its fields.

import mongo4cats.collection.operations.Updateval payment1Filter = Filter.idEq(payment1.id)val amountUpdate = Update
.set("amount", BigDecimal(5.0))
.
currentTimestamp("updatedAt")
for {
...
found <- coll.findOneAndUpdate(payment1Filter, amountUpdate)
}
yield ()

In this example, a new operation type introduced — Update , which allows us building and sequencing multiple update operations that then will be executed on a documents that match with the provided filter. Here we find a document with the id matching of the id of payment payment1, update its price, add a new currentTimestamp field and return the original document. Alternatively, there are also methods for just updating one or multiple documents: updateOne and updateMany, respectively. Both of these methods have an API similar to findOneAndUpdate.

Furthermore, there are also methods replacing or deleting documents: findOneAndReplace and findOneAndDelete, respectively.

Streaming

As was mentioned above, when building a find query, there are options for returning a result in a form of a single element, sequence of elements or in a form of FS2 Stream. FS2 is a library for purely functional, effectful, and polymorphic stream processing. FS2 is built upon Cats and Cats Effect, while its core types (streams and pulls) are polymorphic in the effect type (as long as it is compatible with cats-effect typeclasses), and thus can be used with other effect libraries.

for {
...
total <- coll.find
.stream
.map(_.amount)
.fold(BigDecimal(0))(_ + _)
.compile
.last
} yield ()

In the presented example we are streaming all documents from our collection and calculate the total amount paid.

It needs to be noted, that calling stream creates an unbounded stream that will pull as many document as there are until the collection gets empty or until the memory get full. If there are a lot of documents in the collection, then creating a stream like this might lead to a potential out of memory failures. To avoid this, there is an alternative method available — boundedStream(capacity) , that will distribute always at max capacicy elements.

Bulk writes

The last thing that I would like to mention is bulkWrite operation which can be called on an instance of MongoCollection[F, T]. bulkWrite takes an array of write operations and executes each of them. By default these operations are executed in order, however this can be overriden with BulkWriteOptions.

for {
...
commands = List(
WriteCommand.DeleteOne(Filter.lt("amount", BigDecimal(2.5))),
WriteCommand.InsertOne(payment1),
WriteCommand.UpdateOne(Filter.eq("index", 50), Update.set("amount", BigDecimal(5.0)))
)
res <- coll.bulkWrite(commands, BulkWriteOptions(ordered = false))
} yield ()

Conclusion

In this blogpost, I tried to highlight the most common cases when working with MongoDB and show how easy it is to use it with the effect system like Cats Effect through mongo4cats. The examples presented here only cover its the most basic and common parts, for more advanced usage patterns refer to the github or official MongoDB documentation.

--

--