Last updated on 22.11.2020
Hello all!
Recently I’ve started diving in fs2 and http4s. They looked so awesome, that to properly introduce myself I decided to implement something interesting. A bot for Telegram messenger seemed like a nice idea. So here’s a small tutorial on how to implement a bot in a purely functional way using tagless final encoding, fs2 for streaming and http4s client to talk to telegram API.
Most of the code snippets here have comments, that explain in more detail what’s going on. I encourage the reader to not skip them — many small details are not covered in the text. Otherwise it would be twice as large.
Disclaimer 1: There are feature rich telegram bot libraries out there. If you need something to be done quickly and you don’t mind bringing akka to your dependencies — you’ll be better off with those solutions.
Disclaimer 2: I’m in no way an expert in both fs2 or http4s, so if you find a more optimal way to do something presented here — please, leave me a comment! 🙂
“Gimme da code!”. Here’s the github repo.
So what are we going to build?
A todo-list bot. To not overload the tutorial with specifics of Telegram Bot API, we’ll make it very simple:
- It will keep a separate todo-list for each chat (either personal or group one)
- Bot will be “long polling” the messages that were sent to it. I selected long polling, because it’s simpler to implement and, more importantly, it works in development mode.
- The interface will look like this:
- /show command will make the bot answer with the current todo-list.
- /clear command will erase all the tasks for this chat
- any other message will be interpreted as an instruction to add a new todo-list item (with message content used as list item content)
So in the end interaction with the bot should look like this:
Let’s proceed to designing the algebras.
Designing Algebras
Despite the simplicity of the bot, there’s quite a bunch of algebras we’re going to operate with:
- A simplified Telegram Bot API algebra, that will only contain the requests we need
- Some kind of storage algebra, that will handle the storage of our todo-lists
- Logger algebra for purely functional logging
- Higher-level todo-list bot algebra, that will be composed out of other algebras above
Logger algebra
We’ll start with the simplest one. Logging is quite a common task, and there are projects already that provide logging algebras out of the box. We’ll use log4cats.
For those who are wondering, a simplified example of Logger algebra would look like this:
Log4cats provides several implementations, we’ll pick slf4j.
Storage algebra
Storage algebra should also be quite simple to digest:
Basically, for each particular chat, it allows us to add items, query the whole list and erase it.
In a real world app, this algebra would be interpreted into some database storage service. But for our purposes an in-memory implementation will be enough.
For purely functional and asynchronous concurrent access to shared state, we’ll use fs2.async.Ref. We could use Ref from cats-effect 1.0.0-RC2, but at the moment of writing it was not possible due to http4s depending on incompatible version of cats-effect.
Ref will store a Map[ChatId, List[Item]], which is essentially what we want to store. Here’s the implementation:
Telegram Bot API
Telegram Bot API algebra will look this way:
Yep, for this bot we’ll only need to poll incoming messages and post responses.
Notice, that I introduced a separate effect S[_] for streamed result. The idea came from this recent article. While in this particular example we’ll only use fs2.Stream[F, ?] as the streaming effect, it makes sense to abstract it out anyway. As far as I understand, abstracting over streaming effects in Scala is an area of research, so in future we may find out a better way to work with streams in tagless final encoding.
Here goes one of the most exciting parts of this post. Let’s implement bot API algebra!
First let’s see what it is constructed from. There’s a comment for each constructor parameter.
Now let’s implement the logic using this toolkit. sendMessage is a simple one, so we’ll start with it. It’s going to call the endpoint with the same name:
Here we’re using query parameters flavour of the API, which is not very convenient in general. For production bots there are other ways, including JSON.
Also, we don’t care about result body here, so we pass Unit type to the client.expect call.
Now to the juice — streaming updates.
Telegram Bot API — stream updates
In plain words, what we’re going to do is to repeatedly call the getUpdates endpoint. Since getUpdates returns a list of updates, we’ll also have to flatten the result of each call. Finally, our stream has to be aware of the last requested offset — we don’t want to receive any duplicates.
So let’s begin step by step.
repeatedly call
Here we create a stream of single
Unit value,
repeat it and lift into our effect
F using
covary.
Here we don’t add any delay between stream elements, because we’ll instead rely on long polling timeout for throttling, which is recommended in the API docs. Just in case you wonder — throttling an fs2 stream is very simple.
Next step.
call the getUpdates endpoint
and
be aware of the last requested offset
This feels like a stateful stream stage. Moreover, it also requires to run side-effects to obtain the new state — new offset can only be obtained from the pack of fresh updates.
Let’s dive a little deeper here. For pure stateless mapping fs2 defines a map function on streams:
Then, if we need to run an effectful mapping, we use evalMap (notice the mapping function is now a Kleisli-like effectful function)
If we want to go from a simple map to a stateful map, there’s mapAccumulate. Now you need to specify initial state and a way to obtain the new state after mapping each element:
Notice also, that the resulting stream for each incoming element emits both the stream state after the mapping and the mapped element itself.
Probably you see where it’s going — we need both! Each of our polls needs the latest offset. But to update the offset for the next request we have to execute the poll! So we want something like mapAccumulate, but the calculation step has to be effectful.
And there’s such combinator, it’s (quite expectedly) called evalMapAccumulate:
Now we’re ready to proceed with our stream of updates:
requestUpdates stage has to do 2 things: get the new messages from the Telegram Bot API and calculate the new offset:
Good! One last step remains.
flatten the result
Each response contains a list of updates. For a better API user experience we’d like to have each update as a standalone stream element. This is quite simple to achieve with flatMap:
Basically, we transform each polled bunch of updates into a separate stream of updates (using Stream.emits ), and concatenate them together into a single flattened stream.
And that’s it. Types line-up, our stream of updates is ready to be processed by some domain specific algebras.
Todo-list bot logic
Now that we have all the lower level machinery in place, let’s develop the business logic.
First of all, let’s define all the possible inputs to our bot. We’ll also need a way to convert raw incoming messages into domain specific commands:
And now we have everything we need to define todo-list bot algebra:
This is a higher level overview. The only interaction this algebra provides is to launch the bot process. The process itself is a simple stream that polls the updates (using the lower level bot API) and handles each update with an effect — hence the evalMap we’re already familiar with.
Let’s take a look at both stages.
pollCommands is quite simple: we start long polling from zero offset and map all non-empty messages into domain commands. Conversion involves some hoops since message.text is an option.
Also, in a real world implementation after each processed command we’d persist the offset somewhere, so that when bot is restarted, that offset would be used instead of zero. But, actually, it’s not that bad — once getUpdates was called with some non-zero offset, long polling API marks all the commands before that offset as read, and they are no longer served, even if you then call the API with zero offset.
handleCommand is the place where we’re going to invoke our storage algebra. This one should be pretty clear, despite the verboseness:
All commands are handled in a similar fashion: update the storage and trace the change to the log.
There’s one small caveat with
addItem. I want to make the bot a little less boring than a stone, so why won’t it have several different answers to choose from when an item is added?
Such cases are when we should carefully track and suspend side-effects. Random number generation is a side-effect, so we use the
Sync[F] to suspend it until the “end of the universe”.
Also,
List.head is unsafe, so we use
F.catchNonFatal to lift the error into
F . It’s possible, because
Sync[F] extends
MonadError[Throwable, F]:
Wiring up and “launching” the bot process
Now we have everything to construct our bot.
Quite a lot going on here:
- We derive the JSON decoder for API response using auto derivation from circe generic
- Http client is created safely using Http1Client.stream. This produces a stream of a single element, and guarantees that all allocated resources (connection pool) are released when the stream either finishes or crashes.
- We flatMap the http client stream into our todo-list bot process stream. To create it, we cook all the ingredients. Some of these “cooking” steps are also side-effectful: for example the Logger interpreter or the Ref instance for our storage algebra.
- Stream.force in the end just allows to go from F[Stream[F, A]] to a Stream[F, A] that we’d like to return
End of the universe
We can be really proud of ourselves — we managed to develop a nice useful computation without even specifying our effect type, let alone executing any side-effects! That’s cool.
Now we’re ready to pull the trigger: specify our effect type and actually run the computation. We’ll use fs2.StreamApp[IO] helper so that we don’t even have to write “unsafe” with our own hands.
To test the bot out, you’ll have to create a one for yourself. It’s a no-brainer, just follow the instructions here. Once you get a token, just plug it in and you’re ready to go!
To not expose my test bot token, app grabs it from TODOLIST_BOT_TOKEN env variable. If that doesn’t fit you, just put your token directly into the TodoListBotProcess constructor instead.
Conclusion
So it turns out that you don’t need akka or other complex frameworks to build a Telegram Bot. Moreover, you can do it in a purely functional way, and it actually looks beautiful! And as a bonus — the process of writing a tagless final program is a real joy, I definitely recommend to try it out 🙂
Some ideas to implement as an exercise for interested readers:
- Allow to edit items (through editing original messages in the chat)
- Wrap the bot with an administration http api, to send announcements for example
That’s all I have this time, thanks for reading!