RxSwift - subscribeOn vs observeOn
RxSwift 中线程切换是比较方便的,但其中也有值得注意的一些事项。
Observable subscriptions
订阅(subscribing)和观察(observing)方面的术语有点混乱,所以让我们首先解决这个问题。
让我们看一下可观察订阅的工作原理。我们可以将订阅分成3部分:

- 首先定义了一个
Observable, 在某些情况下,在闭包中提供一些代码来执行工作并向任何观察者发出元素。当你在创建Observable的时候,这些代码并不是立即执行,而是存储起来以备后用。如果没有observers,observable将不会做任何事,直到有observer来订阅他。 - 在为订阅建模时,您可以使用一些运算符(如map,filter等)来处理发出的元素。添加运算符仍然不执行任何工作, 只是创建一个“更专业”的
Observable。 - 只有当在 一个
Observable上调用subscribe(...)方法时,才“打开了”一个Observable。subscribe(...)方法调用后,1 中 定义的代码才真正的执行。
这里有两个要点:
- subscription code 是调用
subscribe()方法调用后被执行的代码, 位于 Observable.create { … } 的闭包内。它是创建 订阅(subscription)和产生元素的代码。 - observation code 是观察发射出来的元素的代码,是在 onNext: { … }, onCompleted: {…} 中提供的代码。这里是你观察元素的地方。

线程切换
subscribeOn:
此操作符将更改 subscription code 的执行线程。subscribeOn 将会改变无论是在它之前调用还是之后调用的方法的执行线程,无关顺序。 subscribeOn 可以放在任何位置(有例外,见下文)。
observeOn:
此操作符将会更改 observer code 的执行线程。
observeOn 只工作在下游
let diseposeBag = DisposeBag()
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.map { value -> Int in
print("\n\n 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 2
}
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print(" 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 3
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print(" 😀 Queue: \(element) \(self.currentQueueName() ?? "queue")")
}).disposed(by: disposeBag)

subscribeOn位置无关紧要。它工作在下游和上游
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.map { value -> Int in
print("\n\n 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 2
}
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print(" 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 3
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print(" 😀 Queue: \(element) \(self.currentQueueName() ?? "queue")")
}).disposed(by: disposeBag)

连续的 subscribeOn 不会连续改变线程
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print("\n\n 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 2
}
.subscribeOn(MainScheduler.instance)
.map { value -> Int in
print(" 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 3
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print(" 😀 Queue: \(element) \(self.currentQueueName() ?? "queue")")
}).disposed(by: disposeBag)
view raw
连续的 observeOn 将连续改变线程
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print("\n\n 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 2
}
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print(" 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 3
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print(" 😀 Queue: \(element) \(self.currentQueueName() ?? "queue")")
}).disposed(by: disposeBag)
subscribeOn 无法覆盖 observeOn 的线程更改
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.map { value -> Int in
print("\n\n 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 2
}
.subscribeOn(MainScheduler.instance)
.map { value -> Int in
print(" 😀 Queue: \(self.currentQueueName() ?? "queue")")
return value * 3
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print(" 😀 Queue: \(element) \(self.currentQueueName() ?? "queue")")
}).disposed(by: disposeBag)

currentQueueName
func currentQueueName() -> String? {
let name = __dispatch_queue_get_label(nil)
return String(cString: name, encoding: .utf8)
}