文中写一些 swift 方法签名时,会带上 label,如
subscribe(_ subscriber:)
,正常作为 selector 的写法时会忽略掉 label,只写作subscribe(_:)
,本文特意带上 label 以使含义更清晰。
overview
在 app 运行过程中会发生各种各样的异步事件,如网络请求的返回,notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。combine 框架提供了一种声明式的 swift api,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。
combine 框架采用 publisher-subscriber 模式:
- 协议
publisher
表示一种能够随时间产生一系列值的类型。combine 还为这些 publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。 - 在这个处理链的最末端,是由协议
subscriber
表示的订阅者类型,接收并处理发送给它的值。 - 当一个 subscriber 订阅到一个 publisher 上时,会接收到一个新生成的由协议
subscription
表示的类型对象,subscriber 通过该对象来向 publisher 请求值,而 publisher 也只有在接收到 subscriber 的显式请求时才会分发值。
publisher 和 subscriber 的交互流程
- 首先,subscriber 调用 publisher 的
subscribe(_ subscriber:)
方法,将自己作为参数传过去;subscribe(_ subscriber:)
会接着调用 publisher 的receive(subscriber:)
方法。 - 方法
receive(subscriber:)
是publisher
协议的要求方法,所有遵循 publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法subscribe(_ subscriber:)
来发起订阅。 - publisher 接受订阅之后,会创建一个新的 subscription 对象,然后调用 subscriber 的
receive(subscription:)
方法。协议 subscription 约束了一个方法request(_ demand:)
,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。 - publisher 通过调用 subscriber 的
receive(_ input:)
方法来向其发布值,并在结束时调用 subscriber 的receive(completion:)
来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。anyway,实现 subscriber 协议的类型必须实现这两个方法(以及开头的receive(subscription:)
方法),在这些方法中处理从 publisher 那里接收到的值。
combine 中的 publishers
内置 publishers
combine 框架提供了许多内置的 publisher 类型供我们使用,如:
- 为 sequence 类型实现的 publisher 扩展;
- 为 notificationcenter 实现的
publisher(for:object:)
扩展; - urlsession 的
datataskpublisher(for:)
扩展; - …
subject
subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 combine 提供了一个强大的工具。subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的 send(_ value:)
方法来发布一个值。combine 提供了两种 subject:
- currentvaluesubject:如其名,会维护一个当前值,初始化时需要传入一个初始值作为当前值,后续通过调用
send(_ value:)
来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。 - passthroughsubject:不同于 currentvaluesubject,内部没有缓存状态,每次调用
send(_ value:)
时才会向下游发布值。
@published
该 property wrapper 修饰 class 的某个属性,为其生成一个 publisher,使用方通过 $
加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willset 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。
combine 中的 subscribers
combine 提供了两个内置的 subscribers:subscribers.sink 和 subscribers.assign,但一般不直接创建这两个的实例,而是通过 publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 anycancellable 对象。
sink
sink subscriber 创建的时候会立即调用 subscription 对象的 request(.unlimited)
,后面会详细介绍 demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,sink 就会持续地接收到新值,直至被 cancel。
publisher 有两个 sink 扩展方法:
sink(receivecompletion:receivevalue:)
:两个闭包的含义和用法不必多解释;sink(receivevalue:)
:只有当 publisher 的 failure associated type 是 never 时才可以使用该方法。
assign
assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 published publisher 上,它对 publisher 的 demand 也是 .unlimited。
assign
:(to keypath: referencewritablekeypath , on object: root) - 因为 keypath 类型是 referencewritablekeypath,所以 object 只能是一个类实例;
- 注意该 assign subscriber 会强持有 object 对象,除非上游发布了一个 completion。
assign(to published: inout published
:.publisher) - 使用该 subscriber 可以将上游的值通过一个 @published 修饰的属性重新发布;
- 该方法没有返回值,当关联的 published 实例析构时会自动地 cancel 掉订阅。
publisher 的 operators
combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。
具体的 operator 及其用法详见。
connectable
常用的 subject 如 sink(receivevalue:)
会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。
combine 提供一个 connectablepublisher
协议来支持手动控制开始发布值的时机,遵循该协议的 publisher 只有在显示调用 connect()
方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。
使用 publisher 的 makeconnectable()
operator 来将一个已有的 publisher 包装成一个 publishers.makeconnectable
实例,该实例便是一个 connectablepublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。
combine 中有些 publisher 已经实现了 connectablepublisher 协议,如 publishers.multicast
,timer.timerpublisher
等,有时在一些使用这些 publisher 的简单场景中,显式调用 connect()
反而显得繁琐,因此 connectablepublisher 又提供了一个 autoconnect()
operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。
在 combine 中,一个 publisher 只有在被 subscriber 订阅并且发起要求的时候才会产生值。subscriber 有下面两种方式来发起要求:
- 通过调用 subscription 对象的
request(_ demand:)
方法,subscription 对象在发起订阅时由receive(subscription:)
方法传入; - 每次
receive(_ input:)
调用时,可以返回一个新的 demand。
demand 表示 subscriber 需要多少值,combine 提供的 api 里面,有 .none
, .unlimited
, 以及指定具体数目的 .max(int)
。demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。
自定义 subscriber
内置的 subscriber sink 和 assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 subscriber,如:
class mysubscriber: subscriber {
typealias input = date
typealias failure = never
var subscription: subscription?
func receive(subscription: subscription) {
print("published received")
self.subscription = subscription
dispatchqueue.main.asyncafter(deadline: .now() 5) {
subscription.request(.max(3))
}
}
func receive(_ input: date) -> subscribers.demand {
print("\(input) \(date())")
return subscribers.demand.none
}
func receive(completion: subscribers.completion) {
print ("--done--")
}
}
自定义 subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。
back-pressure 操作符
除了自定义 subscriber,combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:
-
buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;
-
debounce:设定一个 duetime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 duetime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 duetime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:
let bounces:[(int,timeinterval)] = [ (0, 0), (1, 0.25), // 0.25s interval since last index (2, 1), // 0.75s interval since last index (3, 1.25), // 0.25s interval since last index (4, 1.5), // 0.25s interval since last index (5, 2) // 0.5s interval since last index ] let subject = passthroughsubject
() cancellable = subject .debounce(for: .seconds(0.5), scheduler: runloop.main) .sink { index in print ("received index \(index)") } for bounce in bounces { dispatchqueue.main.asyncafter(deadline: .now() bounce.1) { subject.send(bounce.0) } } // prints: // received index 1 // received index 4 // received index 5 // here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator: // time 0: send index 0. (republish task0 at: 0.5) // time 0.25: send index 1. (task0 is discarded, republish task1 at 0.25 0.5 = 0.75) // time 0.75: debounce period ends, publish index 1. (execute task1) // time 1: send index 2. (republish task2 at: 1 0.5 = 1.5) // time 1.25: send index 3. (task2 is discarded, republish task3 at 1.25 0.5 = 1.75) // time 1.5: send index 4. (task3 is discarded, republish task4 at 1.5 0.5 = 2.0) // time 2: debounce period ends, publish index 4. also, send index 5. (execute task4. republish task5 at: 2 0.5 = 2.5) // time 2.5: debounce period ends, publish index 5. (execute task5) -
throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。
-
collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。
许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。
apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 combine 要求 ios 13 以上才能使用,而且一些 api 更新可能会要求更新的 os 版本。然而有位大佬 开发了 combine 的开源实现:,完全兼容 combine 的 api,可以运行在老的 ios 和 macos 版本上,甚至支持 windows、linux 和 wasm。
我们来简单看一下内置的 publisher 之一的 passthroughsubject 的开源实现。
先从订阅流程看起,可以结合上面的图作为参照。首先是 publisher 的扩展方法 subscribe(_ subscribe)
,其实就是简单地调用了一下具体 publisher 类型的协议约束方法,传入参数 subscriber: receive(subscriber: subscriber)
。passthroughsubject 的协议方法实现为:
public func receive(subscriber: downstream)
where output == downstream.input, failure == downstream.failure
{
lock.lock()
if active {
let conduit = conduit(parent: self, downstream: subscriber) // a.
downstreams.insert(conduit) // b.
lock.unlock()
subscriber.receive(subscription: conduit) // c.
} else {
let completion = self.completion!
lock.unlock()
subscriber.receive(subscription: subscriptions.empty) // d.
subscriber.receive(completion: completion) // e.
}
}
先看 c 处,最终调用了 subscriber 的 receive(subscription:)
,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。
我们上面多次介绍到 subscription 类型,但 combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 conduit 便是 passthroughsubject 的 subscription 实现类。我们接着看 passthroughsubject 的另一个协议约束方法:
// passthroughsubject 的 send(_ input:) 实现
public func send(_ input: output) {
lock.lock()
guard active else {
lock.unlock()
return
}
let downstreams = self.downstreams
lock.unlock()
for conduit in downstreams {
conduit.offer(input)
}
}
// conduit 的 offer(_ output:) 实现
override func offer(_ output: output) {
lock.lock()
guard demand > 0, let downstream = downstream else {
lock.unlock()
return
}
demand -= 1 // a.
lock.unlock()
downstreamlock.lock()
let newdemand = downstream.receive(output) // b.
downstreamlock.unlock()
guard newdemand > 0 else { return }
lock.lock()
demand = newdemand // c.
lock.unlock()
}
passthroughsubject 的 send(_ input:)
会调用 conduit 的 offer(_ output:)
,在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newdemand 加到现有 demand 上面,和前文描述的逻辑完全一致。
其他方法实现、subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。