RxSwift by Examples #4 – Multithreading
When we are talking about Rx, often times it all boils down to connecting (binding) our data sources with UI. You can see it clearly in our examples – we are connecting data to UI on a daily basis.
In previous parts of the series, apart from UI bindings, we were also talking about retrieving the data. When it comes to fetching data from the server, most of the time we have to parse it somehow. If data is large enough, the task of mapping can be memory and time consuming, especially when operation is scheduled on the main thread, we will block UI which will result in horrendous user experience of our end product.
In part #3 of the series we were mapping objects as well. We’ve used something called MainScheduler.instance
in a few operators, because “we had to make sure our data is on the main thread”. In fact, this is a Scheduler, not a Thread, so why we were talking about threads at all? Moreover, we’ve just learned that we shouldn’t map objects on the main thread, but it seems like we did it in our last example. What is going on?
You can find out all the answers in today’s article! Remind yourself earlier parts of the series, grab a drink, and get ready for action!
Schedulers
We will start with a little bit of theory about schedulers. When we are doing some operations with Rx, by definition it is all done on the same thread. Unless you don’t change the thread manually, entry point of the chain will begin on the current thread and it will also dispose on the same thread.
Schedulers are not really a threads, but as the name suggests they are scheduling the tasks that they are given. We have 2 types of schedulers: serial and concurrent. Below is the list with the schedulers that are already built-in:
- CurrentThreadScheduler (Serial) – schedules on the current thread, this is also the default scheduler.
- MainScheduler (Serial) – schedules on the main thread.
- SerialDispatchQueueScheduler (Serial) – schedules on a specific queue (
dispatch_queue_t
). - ConcurrentDispatchQueueScheduler (Concurrent) – schedules on a specific queue (
dispatch_queue_t
). - OperationQueueScheduler (Concurrent) – schedules on a specific queue (
NSOperationQueue
).
Interesting thing is that if you pass a concurrent queue to a serial scheduler, RxSwift will make sure that it is transformed into a serial queue.
The other way around (passing serial queue to concurrent scheduler) shouldn’t cause any problems as well, but we’d rather avoid that, if possible.
You can also implement your own scheduler for some customization, this document is really helpful if you do so.
observeOn() & subscribeOn()
These two methods are really the core to multithreading. When you look at them, you might think that from the name it is really obvious what they should do. In reality, few people understand what is the difference between them and why specific behaviour happened with its usage. But let’s forget about them for a second, we’ve just got a call from a friend and we are going on a road trip!
Well… maybe not exactly that kind of an adventure, but really close. Our friend, Emily, had asked us to take care of her cat Ethan, while she was gone for holidays. She just came back and we need to take Ethan back to his mom. We have to drive few hours to her house, so better prepare for some road trip fun!
Normally when we drive to Emily, we take the default route through the highway. But today we wanted to change something in our life and we choose to go with the two-lane freeway. The weather is so great that after an hour of driving we stop the car to breathe some fresh air. Suddenly it hits us – driving in that weather on a highway would be the best thing ever. And because fresh air always had a bad impact on us, we decide to go back on the good old highway. With a company of good music, fine cat and a beautiful weather, after a few hours of driving we finally meet Emily, deliver the cat, and everyone is happy. And we’ve just learned both observeOn()
and subscribeOn()
.
To clarify – we are an Observable, Ethan is our produced Signal, route is a Scheduler and Emily is an Observer. Emily subscribes to us, and she believes that she will get a new signal (a cat). We also have a default route when we drive to Emily. Same thing with Rx, we also have there a default scheduler. But this time we’ve chosen to use a different route (scheduler) as a starting point. When you want to start your trip with route different than the default one, you use subscribeOn()
method. Here’s the catch: if you use subscribeOn()
, you can’t be sure that in the end of the trip (subscribeNext()
used by Emily) you will be on the same route it started. You just guarantee that you will start there.
Second method, observeOn()
can change the route as well. But it is not constrained to the beginning of the trip – you can switch the route using observeOn()
anytime of the trip. In comparison, subscribeOn()
switches route only in the beginning – that’s the difference. Most of the time you will use observeOn()
though.
Alright, back to the cat delivery. Pseudo-code that would represent our delivery in RxSwift could look like this:
1 2 3 4 5 6 7 8 9 10 | catObservable // 1 .breatheFreshAir() // 2 .observeOn(MainRouteScheduler.instance) // 3 .subscribeOn(TwoLaneFreewayScheduler.instance) // 4 .subscribeNext { cat in // 5 if cat is Ethan { hug(cat) } } .addDisposableTo(disposeBag) |
Step-by-step:
1. We subscribe to Cat observable, which emits Cat signals.
2. We should be on the same scheduler we were before the subscription (which is the default behaviour of Rx).
3. Switch the scheduler to the MainRouteScheduler
. Now every operation below this one will be scheduled on MainRouteScheduler
(of course if we don’t change the scheduler again later in the pipeline).
4. Now we say that we start the chain on TwoLaneFreewayScheduler
. So breatheFreshAir()
will be scheduled on TwoLaneFreewayScheduler
, and then the scheduler is changed again using observeOn()
.
5. subscribeNext()
gets scheduled by MainRouteScheduler
. If we didn’t add the observeOn()
before, it would get scheduled by TwoLaneFreewayScheduler
.
LET’S TALK ABOUT YOUR APP
We’re 100% office based team with 7-years’ experience
in mobile & web app development
In summary: subscribeOn()
points in where to start the whole chaining, observeOn()
points in where to head next. Image below (courtesy of reactivex.io) should now be pretty clear about what is happening when these methods are called. Notice the blue color of arrows that shows subscribeOn()
scheduler and orange & pink colours which shows two different schedulers used when each of observeOn()
is called.
And with that in mind we can point to our Example!
Example
In part #3 (which has knowledge that is mandatory in this example, so please be sure to review that one!) we’ve searched issues for given repository on GitHub. Today we will search for repositories of given username, also on GitHub. This time however, we will use Alamofire for network requests and ObjectMapper for parsing our objects. Thanks to awesome RxSwiftCommunity, we also have an extension to Alamofire and RxSwift, called RxAlamofire, which I’ve also mentioned in previous article.
First let’s create a project as we created in a tutorial before. We will also use CocoaPods with RxAlamofire/RxCocoa (which has RxSwift, RxCocoa and Alamofire as dependencies) and ObjectMapper. Our Podfile should look like this:
1 2 3 4 5 6 7 8 9 | platform :ios, '8.0' use_frameworks! target 'RxAlamofireExample' do pod 'RxAlamofire/RxCocoa' pod 'ObjectMapper' end |
Now onto the coding part! As you probably know, making an outline before doing the task is really helpful, so we will try to think about what we need to do and how we can approach this. Example outline in our case could be:
- Create UI, probably standard one with
UISearchBar
andUITableView
. - Observe search bar, and every time new value is given, transform it to array of repositories (if possible). Here we will need model for network requests.
- Update the table view with our new data. We need to think about schedulers and how to not flood the UI.
Step 1 – Controller and UI.
We will start with UI which again is just a UITableView
and UISearchBar
. You can follow the design in the GIF above, or you can create your own – what suits you the most!
Next, we will need a controller that is going to manage everything: starting from observing text field and ending on passing the fetched repositories to our table view. Let’s create new file, called RepositoriesViewController.swift
, and prepare our controller with importing modules and basic configuration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import UIKit import ObjectMapper import RxAlamofire import RxCocoa import RxSwift class RepositoriesViewController: UIViewController { @IBOutlet weak var tableView: UITableView! @IBOutlet weak var searchBar: UISearchBar! override func viewDidLoad() { super.viewDidLoad() setupRx() } func setupRx() { } } |
Again we prepared the setupRx()
method already, because we can feel that we will rely heavily on Rx (is that a prophecy?).
So let’s create our observable from the search bar’s rx_text
property, like in our latest example (throttle()
& distinctUntilChanged()
included). But this time let’s add filter to that: we don’t really want empty values. We will just leave the latest request in a table view when that happens. So the observable should look like the one below:
1 2 3 4 5 6 7 8 9 10 11 | class RepositoriesViewController: UIViewController { ... var rx_searchBarText: Observable<String> { return searchBar .rx_text .filter { $0.characters.count > 0 } // notice the filter new line .throttle(0.5, scheduler: MainScheduler.instance) .distinctUntilChanged() } ... } |
And we will just add it as a variable to our RepositoryViewController
. Now we would need to add connections between our new observable transformed into Observable<[Repository]>
and pass back to UITableView
. And we’ve done that before!
Step 2 – Network model and mapping objects
Right, Network model! But before that, we need a setup for mapping our objects. This time we will use different mapper, just so you know there are a lot of good options and you should always search for the one you like the most. ObjectMapper
is also an awesome mapper and the configuration is kinda similar to the Mapper
library. Let’s create new file called Repository.swift
and implement the mapping setup for Repository
object:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import ObjectMapper class Repository: Mappable { var identifier: Int! var language: String! var url: String! var name: String! required init?(_ map: Map) { } func mapping(map: Map) { identifier <- map["id"] language <- map["language"] url <- map["url"] name <- map["name"] } } |
Perfect! That was the last part before the real action…
We have a controller, we have a model for Repository
object… Yeah, time for our network model!
We will initialize the model with specific Observable<String>
and implement method that returns Observable<[Repository]>
. Then we can connect it to our view in RepositoriesViewController
. The initial implementation of our RepositoryNetworkModel.swift
could look like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import ObjectMapper import RxAlamofire import RxCocoa import RxSwift struct RepositoryNetworkModel { private var repositoryName: Observable<String> private func fetchRepositories() -> Driver<[Repository]> { ... } } |
Looks pretty casual at first. But if you look closely, we don’t actually return Observable<[Repository]>
, but Driver<[Repository]>
instead. What is this Driver
guy and why you are lying to me all the time? ?
Well… We are talking about Schedulers today. And when you want to bind data to your UI, we always want to do it with usage of MainScheduler
. And basically that is the role of a Driver
. Driver
is a Variable
that says “Okay buddy, I’m gonna be on the main thread so don’t worry and bind me!”. This way we make sure that our binding won’t be error-prone and we can safely do the connection.
Okay, but what about implementation? Well, let’s start with flatMapLatest()
that we used already before, and transform our Observable<String>
to Observable<[Repositories]>
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } } ... } |
Hmm, first of all it seems different, there is another map()
in addition to flatMapLatest()
. But in reality it isn’t something you should be afraid of. See, in the flatMapLatest()
we do the usual network request, then if there is any error we stop the pipeline with Observable.never()
. Then we map our response from Alamofire
to Observable<[Repository]>
. We could chain the map()
in the flatMapLatest()
(after the catchError()
) as well, but we will need it outside of the flatMapLatest()
for later, so it is just the matter of preference.
Okay, this code above shouldn’t compile (because we return Observable
and we expect to return Driver
) so we need to go deeper. How do we transform Observable<[Repository]>
to Driver<[Repository]>
? Well, really simple. Any Observable
can be transformed to Driver
just by using asDriver()
operation. In our case, we will use .asDriver(onErrorJustReturn: [])
which basically means: If there is any error in the chain (but there probably isn’t, since we covered them before), return an empty array. And that’s it! Working code for now would be:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } ... } |
Perfect! See, we didn’t even touched observeOn()
or subscribeOn()
, but we already switched schedulers 2 times! First was with throttle()
and now with asDriver()
(which makes sure we are on MainScheduler
) – and that is only a beginning. The code should work now and the last thing we have to do is to connect our repositories in our RepositoryNetworkModel
to a view controller. But before that let’s replace the method with something else, because that way we create the new pipeline every time we use it. Instead, I would love a property. But not a computed property, because the result would be the same as with a method. Instead, we will create a lazy var
that will be bound to our method that fetches repositories. This way we will avoid multiple creation of the sequence. Also we need to hide everything that isn’t the property, just to be sure that everyone that uses this model will get the correct Driver
property! The only con of this solution is the fact that we have to explicitly type init in our struct, but I think this is a fair trade. So the current final model should look like the one below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | struct RepositoryNetworkModel { lazy var rx_repositories: Driver<[Repository]> = self.fetchRepositories() private var repositoryName: Observable<String> init(withNameObservable nameObservable: Observable<String>) { self.repositoryName = nameObservable } private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } } |
Great! Now we will just connect the data to view controller. When we wanna bind theDriver
to our table view, instead of bindTo
(that we used before) we will use drive()
operation but the syntax and everything is just the same as with bindTo
. In addition to binding our data to table view, we will also make another subscription and every time we have a count of repositories that equals 0, we will show an alert.
The final RepositoriesViewController
class presents as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | class RepositoriesViewController: UIViewController { @IBOutlet weak var tableViewBottomConstraint: NSLayoutConstraint! @IBOutlet weak var tableView: UITableView! @IBOutlet weak var searchBar: UISearchBar! let disposeBag = DisposeBag() var repositoryNetworkModel: RepositoryNetworkModel! var rx_searchBarText: Observable<String> { return searchBar .rx_text .filter { $0.characters.count > 0 } .throttle(0.5, scheduler: MainScheduler.instance) .distinctUntilChanged() } override func viewDidLoad() { super.viewDidLoad() setupRx() } func setupRx() { repositoryNetworkModel = RepositoryNetworkModel(withNameObservable: rx_searchBarText) repositoryNetworkModel .rx_repositories .drive(tableView.rx_itemsWithCellFactory) { (tv, i, repository) in let cell = tv.dequeueReusableCellWithIdentifier("repositoryCell", forIndexPath: NSIndexPath(forRow: i, inSection: 0)) cell.textLabel?.text = repository.name return cell } .addDisposableTo(disposeBag) repositoryNetworkModel .rx_repositories .driveNext { repositories in if repositories.count == 0 { let alert = UIAlertController(title: ":(", message: "No repositories for this user.", preferredStyle: .Alert) alert.addAction(UIAlertAction(title: "OK", style: .Default, handler: nil)) if self.navigationController?.visibleViewController?.isMemberOfClass(UIAlertController.self) != true { self.presentViewController(alert, animated: true, completion: nil) } } } .addDisposableTo(disposeBag) } } |
The only new thing in this piece of code is the driveNext()
operation, but as you can guess it is just a subscribeNext
for Driver
.
And that’s it for Step 2! Onto the last one!
Step 3 – Multithreading optimization
So as you may guessed, in fact everything we did was done on a MainScheduler
. Why? Because our chain starts from searchBar.rx_text
and this one is guaranteed to be on MainScheduler
. And because everything else is by default on current scheduler – well our UI thread may get overwhelmed. How to prevent that? Switch to the background thread before the request and before the mapping, so we will just update UI on the main thread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .flatMapLatest { text in // .Background thread, network request return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .map { (response, json) -> [Repository] in // again back to .Background, map objects if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } ... } |
But why do we use observeOn()
two times in the same way? Because we don’t really know from the code above that requestJSON
will return data on the same thread it started. So we’re gonna make sure it is again on the background thread for mapping, which is quite heavy.
Alright, now we have mapping on background threads, result delivered on UI thread, can we ask for more?! Of course we can! We want our user to know that some network request is going on. For that we will use the UIApplication.sharedApplication().networkActivityIndicatorVisible
property, widely known as a spinner. But now we have to be careful with threads, since we want to update UI in the middle of a request/mapping operations. Also we will use a nice method called doOn()
which can do whatever you want on specific events (like .Next, .Error etc.). Let’s say we want to show spinner before the flatMapLatest()
: doOn
is the one that can do that. We just need to switch to MainScheduler
before the actions are performed.
So our full code for fetching repositories would be as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | struct RepositoryNetworkModel { lazy var rx_repositories: Driver<[Repository]> = self.fetchRepositories() private var repositoryName: Observable<String> init(withNameObservable nameObservable: Observable<String>) { self.repositoryName = nameObservable } private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .subscribeOn(MainScheduler.instance) // Make sure we are on MainScheduler .doOn(onNext: { response in UIApplication.sharedApplication().networkActivityIndicatorVisible = true }) .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .flatMapLatest { text in // .Background thread, network request return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .map { (response, json) -> [Repository] in // again back to .Background, map objects if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates .doOn(onNext: { response in UIApplication.sharedApplication().networkActivityIndicatorVisible = false }) .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } } |
And that’s it! Now you know why we didn’t care about threads when parsing: the extension of Moya-ModelMapper
switches schedulers for us.
Full app should be working as expected now:
This one was even longer than the last one so once again thank you for your patience, persistence and willingness to follow this not-so-easy example with me.
As always, please keep me updated with your feedback, ideas and improvements on twitter, or just comment here. Your messages always make my day so thank you for that as well. ✌️
Also I’m improving the resources for RxSwift all the time, so be sure to check them out. And subscribe(?)
to get the latest info about our series or RxSwift in general.
You can find complete source code on Droids on Roids’s GitHub repository and here you can check other RxSwift examples!
Read more articles about RxSwift
RxSwift by Examples #1 – The Basics
RxSwift by Examples #2 – Observable and the Bind
RxSwift by Examples #3 – Networking
RxSwift by Examples #4 – Multithreading
About the author
Ready to take your business to the next level with a digital product?
We'll be with you every step of the way, from idea to launch and beyond!
Really great post!!! I think the explanation regarding the difference between subscribeOn and observeOn it’s one of the best I saw it. ?
Thank you so much @victorsigler:disqus! ? At first both
subscribeOn()
andobserveOn()
sounded really similar to me and I was flabbergasted once I found out their true meaning. I’m glad that my explanation was reasonable and you liked it!After I read in deep the Driver unit in the RxSwift documentation I think it would be great in your post if you clarify in the case where you make the call first to
drive
and then todriveNext
to show anUIAlertController
that this only make a HTTP request because the Drive unit share side effects between subscribers, at first this make so confuse about it. Nevertheless good post again ?Thanks for your explain about the Drive.
This series is excellent. I would love to seem some examples of writing tests with an RxSwift application. Thanks for the great work!
Thank you so much @damianesteban:disqus! ? Both testing and error handling are on my list currently, but I want to write how would you write tests in real-world application, and that kinda takes time. Nonetheless, it will happen for sure!
Awesome, thank you. Looking forward to it when you get the chance.
Great tutorial!
But can u please explain when the doOn block calls?
Hey @gusevslava:disqus. I’m not sure I understand your question, but let’s try ?
If you wanted to ask when to use doOn blocks, it is basically a help to do side effects in the pipeline (chain of operators). I use it mostly with flatMap, if I want to do something before transforming the sequence to another (really, and I mean really helpful).
If you wanted to ask when is doOn called on the other hand, the answer is pretty simple. Because this is declarative programming, you specifically tell the pipeline when should the doOn be called. Let’s say you add the doOn block before the flatMap and guess what – the doOn block will be called before flatMap.
Hope it helps! If not, please give me more specifics ?
Excellent series. I’m also looking forward to something on testing. Thanks
Great article!
One question: Why do you use .subscribeOn(MainScheduler.instance) to ensure that networkActivityIndicatorVisible property is accessed on the main thread in .doOn(onNext:..) ? Wouldn’t .observeOn(MainScheduler.instance) have the same effect?
Hey Kostas, first of all, thanks!
About your question, sure, in this case both subscribeOn and observeOn are sufficient, because this is the start of the chain. However, if I’d put the subscribeOn block somewhere down the chain, now only subscribeOn would be enough. Good point tho!
Lukasz many thanks,
As far as I understand in case of network issue, the app will halt. Rx network JSON request (RxAlamofire.requestJSON) will catch error (catchError:, and the following observeOn won’t return anything(it won’t be called at all)
. Am I right?
Hey @siarheibrazil:disqus. You are right, in case of any errors of Alamofire, the request will transform into empty sequence, thus operators like
observeOn
anddoOn
etc. won’t be called because there will be no item in this specific errored sequence.Give that men a medal!!!
Thanks! Really happy you’ve enjoyed it 🙂
Hey Lukas Mroz…great article…
This article is about a year old right now, and you referred to that testing follow-up about 10 months ago. Is it still going to happen, or is it an abandoned ship? Just asking…
Many thanks for this series, anyway…it’s great value even as it is.
Awesome series ?
Looks great, but the details of the networking dependencies are updated (making the tutorial outdated) so fast that it makes this very difficult to follow (XCode 9.1, swift 4.0). In the future, it would be great to not use so many of these dependencies and use more low level approaches, even if they are not so hip. Cheers and thanks for the series.