深入理解 swift combine-kb88凯时官网登录

来自:
时间:2024-05-04
阅读:

文中写一些 swift 方法签名时,会带上 label,如 subscribe(_ subscriber:),正常作为 selector 的写法时会忽略掉 label,只写作 subscribe(_:) ,本文特意带上 label 以使含义更清晰。

overview

在 app 运行过程中会发生各种各样的异步事件,如网络请求的返回,notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。combine 框架提供了一种声明式的 swift api,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。

combine 框架采用 publisher-subscriber 模式:

  • 协议 publisher 表示一种能够随时间产生一系列值的类型。combine 还为这些 publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。
  • 在这个处理链的最末端,是由协议 subscriber 表示的订阅者类型,接收并处理发送给它的值。
  • 当一个 subscriber 订阅到一个 publisher 上时,会接收到一个新生成的由协议 subscription 表示的类型对象,subscriber 通过该对象来向 publisher 请求值,而 publisher 也只有在接收到 subscriber 的显式请求时才会分发值。

publisher 和 subscriber 的交互流程

深入理解 swift combine

  • 首先,subscriber 调用 publisher 的 subscribe(_ subscriber:) 方法,将自己作为参数传过去;subscribe(_ subscriber:) 会接着调用 publisher 的 receive(subscriber:) 方法。
  • 方法 receive(subscriber:)publisher 协议的要求方法,所有遵循 publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法 subscribe(_ subscriber:) 来发起订阅。
  • publisher 接受订阅之后,会创建一个新的 subscription 对象,然后调用 subscriber 的 receive(subscription:) 方法。协议 subscription 约束了一个方法 request(_ demand:) ,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。
  • publisher 通过调用 subscriber 的 receive(_ input:) 方法来向其发布值,并在结束时调用 subscriber 的 receive(completion:) 来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。anyway,实现 subscriber 协议的类型必须实现这两个方法(以及开头的 receive(subscription:) 方法),在这些方法中处理从 publisher 那里接收到的值。

combine 中的 publishers

内置 publishers

combine 框架提供了许多内置的 publisher 类型供我们使用,如:

  • 为 sequence 类型实现的 publisher 扩展;
  • 为 notificationcenter 实现的 publisher(for:object:) 扩展;
  • urlsession 的 datataskpublisher(for:) 扩展;

subject

subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 combine 提供了一个强大的工具。subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的 send(_ value:) 方法来发布一个值。combine 提供了两种 subject:

  • currentvaluesubject:如其名,会维护一个当前值,初始化时需要传入一个初始值作为当前值,后续通过调用 send(_ value:) 来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。
  • passthroughsubject:不同于 currentvaluesubject,内部没有缓存状态,每次调用 send(_ value:) 时才会向下游发布值。

@published

该 property wrapper 修饰 class 的某个属性,为其生成一个 publisher,使用方通过 $ 加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willset 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。

combine 中的 subscribers

combine 提供了两个内置的 subscribers:subscribers.sink 和 subscribers.assign,但一般不直接创建这两个的实例,而是通过 publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 anycancellable 对象。

sink

sink subscriber 创建的时候会立即调用 subscription 对象的 request(.unlimited) ,后面会详细介绍 demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,sink 就会持续地接收到新值,直至被 cancel。

publisher 有两个 sink 扩展方法:

  • sink(receivecompletion:receivevalue:) :两个闭包的含义和用法不必多解释;
  • sink(receivevalue:) :只有当 publisher 的 failure associated type 是 never 时才可以使用该方法。

assign

assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 published publisher 上,它对 publisher 的 demand 也是 .unlimited。

  • assign(to keypath: referencewritablekeypath, on object: root)
    • 因为 keypath 类型是 referencewritablekeypath,所以 object 只能是一个类实例;
    • 注意该 assign subscriber 会强持有 object 对象,除非上游发布了一个 completion。
  • assign(to published: inout published.publisher)
    • 使用该 subscriber 可以将上游的值通过一个 @published 修饰的属性重新发布;
    • 该方法没有返回值,当关联的 published 实例析构时会自动地 cancel 掉订阅。

publisher 的 operators

combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。

具体的 operator 及其用法详见。

connectable

常用的 subject 如 sink(receivevalue:) 会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。

combine 提供一个 connectablepublisher 协议来支持手动控制开始发布值的时机,遵循该协议的 publisher 只有在显示调用 connect() 方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。

使用 publisher 的 makeconnectable() operator 来将一个已有的 publisher 包装成一个 publishers.makeconnectable 实例,该实例便是一个 connectablepublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。

combine 中有些 publisher 已经实现了 connectablepublisher 协议,如 publishers.multicasttimer.timerpublisher 等,有时在一些使用这些 publisher 的简单场景中,显式调用 connect() 反而显得繁琐,因此 connectablepublisher 又提供了一个 autoconnect() operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。

在 combine 中,一个 publisher 只有在被 subscriber 订阅并且发起要求的时候才会产生值。subscriber 有下面两种方式来发起要求:

  • 通过调用 subscription 对象的 request(_ demand:) 方法,subscription 对象在发起订阅时由 receive(subscription:) 方法传入;
  • 每次 receive(_ input:) 调用时,可以返回一个新的 demand。

demand 表示 subscriber 需要多少值,combine 提供的 api 里面,有 .none.unlimited , 以及指定具体数目的 .max(int) 。demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。

自定义 subscriber

内置的 subscriber sink 和 assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 subscriber,如:

class mysubscriber: subscriber {
    typealias input = date
    typealias failure = never
    var subscription: subscription?
    
    func receive(subscription: subscription) {
        print("published                             received")
        self.subscription = subscription
        dispatchqueue.main.asyncafter(deadline: .now()   5) {
            subscription.request(.max(3))
        }
    }
    
    func receive(_ input: date) -> subscribers.demand {
        print("\(input)             \(date())")
        return subscribers.demand.none
    }
    
    func receive(completion: subscribers.completion) {
        print ("--done--")
    }
}

自定义 subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。

back-pressure 操作符

除了自定义 subscriber,combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:

  • buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;

  • debounce:设定一个 duetime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 duetime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 duetime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:

    let bounces:[(int,timeinterval)] = [
        (0, 0),
        (1, 0.25),  // 0.25s interval since last index
        (2, 1),     // 0.75s interval since last index
        (3, 1.25),  // 0.25s interval since last index
        (4, 1.5),   // 0.25s interval since last index
        (5, 2)      // 0.5s interval since last index
    ]
    let subject = passthroughsubject()
    cancellable = subject
        .debounce(for: .seconds(0.5), scheduler: runloop.main)
        .sink { index in
            print ("received index \(index)")
        }
    for bounce in bounces {
        dispatchqueue.main.asyncafter(deadline: .now()   bounce.1) {
            subject.send(bounce.0)
        }
    }
    // prints:
    //  received index 1
    //  received index 4
    //  received index 5
    //  here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator:
    //  time 0: send index 0. (republish task0 at: 0.5)
    //  time 0.25: send index 1. (task0 is discarded, republish task1 at 0.25   0.5 = 0.75)
    //  time 0.75: debounce period ends, publish index 1. (execute task1)
    //  time 1: send index 2. (republish task2 at: 1   0.5 = 1.5)
    //  time 1.25: send index 3. (task2 is discarded, republish task3 at 1.25   0.5 = 1.75)
    //  time 1.5: send index 4. (task3 is discarded, republish task4 at 1.5   0.5 = 2.0)
    //  time 2: debounce period ends, publish index 4. also, send index 5. (execute task4. republish task5 at: 2   0.5 = 2.5)
    //  time 2.5: debounce period ends, publish index 5. (execute task5)
    
  • throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。

  • collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。

许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。

apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 combine 要求 ios 13 以上才能使用,而且一些 api 更新可能会要求更新的 os 版本。然而有位大佬 开发了 combine 的开源实现:,完全兼容 combine 的 api,可以运行在老的 ios 和 macos 版本上,甚至支持 windows、linux 和 wasm。

我们来简单看一下内置的 publisher 之一的 passthroughsubject 的开源实现。

先从订阅流程看起,可以结合上面的图作为参照。首先是 publisher 的扩展方法 subscribe(_ subscribe) ,其实就是简单地调用了一下具体 publisher 类型的协议约束方法,传入参数 subscriber: receive(subscriber: subscriber) 。passthroughsubject 的协议方法实现为:

public func receive(subscriber: downstream)
    where output == downstream.input, failure == downstream.failure
{
    lock.lock()
    if active {
        let conduit = conduit(parent: self, downstream: subscriber) // a.
        downstreams.insert(conduit) // b.
        lock.unlock()
        subscriber.receive(subscription: conduit) // c.
    } else {
        let completion = self.completion!
        lock.unlock()
        subscriber.receive(subscription: subscriptions.empty) // d.
        subscriber.receive(completion: completion) // e.
    }
}

先看 c 处,最终调用了 subscriber 的 receive(subscription:) ,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。

我们上面多次介绍到 subscription 类型,但 combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 conduit 便是 passthroughsubject 的 subscription 实现类。我们接着看 passthroughsubject 的另一个协议约束方法:

// passthroughsubject 的 send(_ input:) 实现
public func send(_ input: output) {
    lock.lock()
    guard active else {
        lock.unlock()
        return
    }
    let downstreams = self.downstreams
    lock.unlock()
    for conduit in downstreams {
        conduit.offer(input)
    }
}
// conduit 的 offer(_ output:) 实现
override func offer(_ output: output) {
    lock.lock()
    guard demand > 0, let downstream = downstream else {
        lock.unlock()
        return
    }
    demand -= 1 // a.
    lock.unlock()
    downstreamlock.lock()
    let newdemand = downstream.receive(output) // b.
    downstreamlock.unlock()
    guard newdemand > 0 else { return }
    lock.lock()
    demand  = newdemand // c.
    lock.unlock()
}

passthroughsubject 的 send(_ input:) 会调用 conduit 的 offer(_ output:),在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newdemand 加到现有 demand 上面,和前文描述的逻辑完全一致。

其他方法实现、subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。

返回顶部
顶部
网站地图