Creating custom combine Publisher(Operator)
Combine에는 Publisher와 Subscriber라는 개념이 있다.
Combine: framework that let you subscribe to things and get value from things
"Things" = Publisher
- Publisher <자료형, 에러>
- 자료형에는 Int, String, 등등
- 에러에는 아무 Swift Error type
- Subscriber: 소비자 (value를 받아들일 수 있는것)
- sink
- assign
특징
*Publisher는 value 를 바로 emit 하지 않고 subscriber의 demand 가 있어야 emit 한다.
일련의 동작과정
- Subscriber -- (subscribe) --> Publisher
- Publisher 에서 receive(subscribe:) 메서드가 실행
- 그럼 Publisher는 subscription을 만든다.
*subscription은 특정 subscriber와 publisher의 connection을 의미한다. - subscription과 subscriber가 서로 대화하기 시작한다. (Publisher는 빠진다.)
Subscription --(backPressure)--> Subscriber
*backpressure: subscriber가 subscription에게 value를 demand해야 subscription은 값을 줄 수 있다.
값을 준 다음 subscriber는 demand를 update 한다. - Subscription은 cancel 메서드로 취소될 수 있다.
Operator의 정의
Publisher -- 1,2,3 --> map {$0 + 1} --> 2, 3, 4 (effected publisher)
Upstream ---------> operator -------->downstream
Custom operator를 만드는 두가지 방법
1. 기존에 있는 operator 를 조합해서 사용
2. custom operator를 만드는 것 (backPressure를 관리해야되서 까다로움)
ㄴsubscription과 subscriber 사이 demand를 관리해 줘야함
* DisposeBag과 Cancellables의 차이점
- bag는 deallocated됬을때 clear되는데 cancel되진 않는다. combine에서는 subscription이 deinit되면 모든 작업이 cancel된다.
1. 기존의 operator를 조합해서 사용
예시1) Bool 값을 emit 하는 Publisher에 대한 toggle 기능
// 기존에 제공하는 map을 사용하여 toggle operator 만들기
public extension Publisher where Output == Bool {
//Bool 값을 emit하는 모든 publisher에 사용 가능
func toggle() -> Publisher.Map<Self,Bool> {
map(!)
}
}
// operator test code
class BasicOperatorTests: XCTestCase {
var subscription: AnyCancellable!
override func tearDown() {
subscription = nil
}
func testToggle() {
let source = [false, true, true, false].publisher
var vales = [Bool]()
subscription = source
.toggle()
.sink(receiveValue: { values.append($0) })
XCTAssertEqual(values, [true, false, false, true])
}
}
예시2) optional 값을 emit하는 publisher에 대해 unwrap 기능
extension Publisher {
func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
compactMap { $0 }
}
}
let values: [Int?] = [1, 2, nil, 3, nil, 4]
values.publisher
.unwrap()
.sink {
print("Received value: \($0)")
}
class MyTestCase: XCTestCase {
var subscription: AnyCancellable!
override func tearDown() {
subscription = nil
}
func testUnwrap() {
let source = [1, 2, nil, 3, nil, 4].publisher
var values: [Int] = []
subscription = source
.unwrap()
.sink(receiveValue: { values.append($0) })
XCTAssertEqual(values, [1, 2, 3, 4])
}
}
MyTestCase.defaultTestSuite.run()
예시3) 2개씩 pair 하는 기능
//upstream [1, 2, 3, 4, 5]
//downstream [1, 2] [2, 3] [3, 4] [4, 5]
public extension Publisher {
func pair() -> AnyPublisher<[Output], Failure> {
scan([]) { buffer, value in
Array((buffer + [value]).suffix(2))
}
.filter { $0.count == 2 }
.eraseToAnyPublisher()
//implementation detail을 consumer가 알필요 없음 erase를 통해 AnyPublisher를 반환
}
}
// operator test code
class BasicOperatorTests: XCTestCase {
var subscription: AnyCancellable!
override func tearDown() {
subscription = nil
}
func testPairs() {
let source = (1...5).publisher
var values = [[Int]]()
subscription = source
.pair()
.sink(receiveValue: { values.append($0) })
XCTAssertEqual(values, [[1,2], [2,3], [3,4], [4,5]])
}
}
기존에 combine 에서 제공하는 operator를 가지고 composition 하는 방식이라 어렵지 않음.
따로 subscription을 만들 필요가 없고 Publisher namespace 의 extension에 where 절을 통해 Output 조건을 달아주고
원하는 기능의 메서드를 달아주면됨
implementation detail을 가려주기 위해 eraseToAnyPublisher 달아줌
2. custom operator를 만드는 것
- Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.
(DispatchTimer: Publisher)
- Output 과 Failure 의 타입을 지정해준다.
- func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, DispatchTime == S.Input 작성
ㄴ subscriber가 publisher에게 붙었을때 receive 메서드가 실행되면서 subscription을 반환해야함
ㄴ 여기서 custom subscription이 필요함 - Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.
(DispatchTimerSubscription<S: Subscriber>: Subscription where S.Input == DispatchTime
- private final class CustomSubscription<S: Subscriber>: Subscription where S.Input == YOURTYPE
ㄴ 외부 접근 필요없기 때문에 private으로 하고 상속받을 게 없기 때문에 final 로 선언
- func request(_ demand: Subscribers.Demand)
ㄴ subscriber와의 관계정의, demand 처리
- func cancel()
ㄴ cancel 되었을때 deinit 처리 - 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의
(static func(_ , _ ,...) -> Publisher.DispatchTimer(...)
예시1) DispatchTimer publisher
값을 만들어서 emit 하는 publisher 예시
- DispatchTimer에 필요한 설정값을 담을 구조체 선언
struct DispatchTimerConfiguration {
let queue: DispatchQueue? //어떤 특정 queue를 사용할건지
let interval: DispatchTimeInterval //타이머의 주기
let leeway: DispatchTimeInterval //여지, deadline 이후 늦어질 수 있는 최대 시간
let times: Subscribers.Demand //이벤트를 몇번 받을 것 인지
}
- Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.
extension Publishers {
struct DispatchTimer: Publisher {
//Output 과 Failure 의 타입을 지정해준다.
typealias Output = DispatchTime // 현재 시각을 DispatchTime값을 방출하게된다.
typealias Failure = Never //실패할 경우가 없다.
//타이머 세팅에 필요한 설정값을 가진다.
let configuration: DispatchTimerConfiguration
init(configuration: DispatchTimerConfiguration) {
self.configuration = configuration
}
//subscriber가 publisher에게 subscribe 했을 때 호출될 함수
//subscription을 통해 내부 로직이 진행된다.
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
//DispatchTimerSubscription은 우리가 구현할 custom Subscription 클래스
let subscription = DispatchTimerSubscription(
subscriber: subscriber,
configuration: configuration
)
subscriber.receive(subscription: subscription)
}
}
}
- Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.
//외부에서 사용하지 않아 private 으로 선언
//Subscriber에 의해 참조되므로 class 로 선언
private final class DispatchTimerSubscription<S: Subscriber>: Subscription where S.Input == DispatchTime {
let configuration: DispatchTimerConfiguration
var times: Subscribers.Demand
var requested: Subscribers.Demand = .none
var source: DispatchSourceTimer? = nil
var subscriber: S?
init(subscriber: S, configuration: DispatchTimerConfiguration) {
self.configuration = configuration
self.subscriber = subscriber
self.times = configuration.times
}
//subscriber와의 관계를 정의한다.
//demand는 acclumulative(누적된다) -> subscriber가 요청한 값이 누적된다.
func request(_ demand: Subscribers.Demand) {
//subscriber에게 configuration에 설정된 값만큼 이미 전달했는지 확인한다.
//즉, Publisher가 받은 demand와 상관없이 예상 값의 최대 수를 보낸 경우 finish로 종료
guard times > .none else {
subscriber?.receive(completion: .finished)
return
}
//새로운 demand를 추가해 requested value를 증가시킴
requested += demand
//timer가 없다면 새로 만들고 있다면 타이머를 시작시킴
if source == nil, requested > .none {
//configuration에 명시된 queue에 따라 DispatchSourceTimer를 생성함
let source = DispatchSource.makeTimerSource(queue: configuration.queue)
//configuration에 명시된 스펙대로 타이머를 세팅
source.schedule(deadline: .now() + configuration.interval,
repeating: configuration.interval,
leeway: configuration.leeway)
///타이머가 한번 시작되면 subscriber에게 값을 방출하지 않아도 멈추지 않음
//멈추려면 subscription을 cancel하거나 deallocate 시켜야함
//timer가 동작할 때 마다 호출되는 클로져
source.setEventHandler { [weak self] in\
//demand가 있는지 확인
guard let self = self, self.requested > .none else { return }
//값을 방출하기 전 requested와 times를 각각 감소시킴
self.requested -= .max(1)
self.times -= .max(1)
//subscriber에게 값을 보냄
_ = self.subscriber?.receive(.now())
//configuration에 나와있는데로 값을 전부 방출했다면 finish 이벤트 전달
if self.times == .none {
self.subscriber?.receive(completion: .finished)
}
}
//타이머 세팅을 끝났으니 activate
self.source = source
source.activate()
}
}
func cancel() {
source = nil
subscriber = nil
}
}
* 왜 demand decrease 시킬때 .max가 붙지?
- 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의
extension Publishers {
static func timer(queue: DispatchQueue? = nil,
interval: DispatchTimeInterval,
leeway: DispatchTimeInterval = .nanoseconds(0),
times: Subscribers.Demand = .unlimited)
-> Publishers.DispatchTimer {
return Publishers.DispatchTimer(
configuration: .init(queue: queue,
interval: interval,
leeway: leeway,
times: times)
)
}
}
- 사용 코드 예시
var logger = TimeLogger(sinceOrigin: true)
let publisher = Publishers.timer(interval: .seconds(1),
times: .max(6))
let subscription = publisher.sink { time in
print("Timer emits: \(time)", to: &logger)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
subscription.cancel()
}
//6번 timer가 동작하도록 되있지만 3.5초 이후 cancel하기 때문에 output은 3개만 나올것이다.
- 테스트 코드
class MyTestCase: XCTestCase {
var subscription: AnyCancellable!
override func tearDown() {
//timer는 바로 tearDown되면 안됨
// subscription = nil
}
func testTimer() {
let publisher = Publishers.timer(interval: .seconds(1), times: .max(6))
var values: [UInt64] = []
var diffs: [UInt64] = []
print("timer test start")
subscription = publisher.sink {
print("sink value \($0.rawValue)")
values.append($0.rawValue)
}
//5초까지 값을 방출한 뒤 subscription nil을 통해 cancel
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
print(values)
diffs = [values[1] - values[0]
, values[2] - values[1]
, values[3] - values[2]]
self.subscription = nil
}
//Timer가 끝난 후 비교하기 위해 6초뒤 Assert실행
let result = XCTWaiter.wait(for: [expectation(description: "기다려!")], timeout: 6)
//DispatchTimer의 rawValue는 nanoseconds 단위
//100000000 나노초만큼의 오차범위
if result == XCTWaiter.Result.timedOut {
for diff in diffs {
XCTAssertTrue(diff > 900000000 && diff < 1100000000)
}
}else {
XCTFail("Delay interrupted")
}
}
}
MyTestCase.defaultTestSuite.run()
예시2) ShareReplay operator
값을 transforming 하는 operator(publisher) 예시
스펙
- 첫번째 subscriber의 upstream publihser를 구독한다.
- 새로운 subscriber에게 마지막 N개의 값을 replay 한다.
- 완료 이벤트가 미리 방출된 경우 릴레이 한다. (릴레이 경주, 이어달리기)
- Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.
* subscriber가 같은 instance를 공유해야 하니 class로 만든다.
extension Publishers {
// 20 publisher는 대부분 struct 로 값타입인데 multicast()처럼 class로 만들때도 있다.
// 하나의 operator instance를 여러 subscriber가 공유해야 되서 class로 만든다.
final class ShareReplay<Upstream: Publisher>: Publisher {
// 21 타입을 변경하지는 않으니 Upstream을 따라간다.
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
// 22 여러개의 subscriber에게 동시에 값을 방출하기 떄문에 lock 이 필요하다.
private let lock = NSRecursiveLock()
// 23 upstream publisher를 가지고 있는다. (subscription 생명주기에 필요함)
private let upstream: Upstream
// 24
private let capacity: Int
// 25
private var replay = [Output]()
// 26 각각의 subscriber들은 할당된 ShareReplaySubscription 으로부터 값을 받는다.
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
// 27 operator는 completion이 끝난 다음에도 값을 replay할 수 있기 때문에 complete 되었는지 여부를 기록해 놓는다.
private var completion: Subscribers.Completion<Failure>? = nil
init(upstream: Upstream, capacity: Int) {
self.upstream = upstream
self.capacity = capacity
}
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
lock.lock()
defer { lock.unlock() }
// 34 새 subscription은 subscriber를 참조하고 현재의 replay buffer, capacity, completion 이벤트를 가진다.
let subscription = ShareReplaySubscription(
subscriber: subscriber,
replay: replay,
capacity: capacity,
completion: completion)
// 35 subscription을 가지고 있는다. (향후 이벤트 방출을 위해)
subscriptions.append(subscription)
// 36 만든 subscription을 subscriber에게 보낸다.
// 현재든 나중이던 값을 요청할 것 이다.
subscriber.receive(subscription: subscription)
// 37 한번만 upstream publisher를 subscribe하도록 한다.
guard subscriptions.count == 1 else { return }
let sink = AnySubscriber(
// 38 closure를 받아서 바로 subscription에 .unlimited 값을 요청하는 AnySubscriber class를 만들어서 publisher가 completion이 되도록 한다.
receiveSubscription: { subscription in
subscription.request(.unlimited)
},
// 39 받은 값을 downstream subscriber에게 전달한다.
receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
self?.relay(value)
return .none
},
// 40 upstream으로부터 받은 completion event로 publisher를 complete 한다.
receiveCompletion: { [weak self] in
self?.complete($0)
}
)
// 바로 complete하기 위해 처음부터 .max(self.capacity) request를 받을 수도 있는데
// combine 은 demand-driven이기 때문에 publisher가 만들어 낼 수 있는만큼 값을 요청하지 않으면 completion event 를 영원히 받지 못한다.
upstream.subscribe(sink)
}
private func relay(_ value: Output) {
// 28 여러 subscriber가 이 publisher를 공유하기 때문에 mutable 변수에 대해서 접근을 lock으로 보호해야 한다.
lock.lock()
defer { lock.unlock() }
//굳이 defer을 사용안해도 되지만 unlock 하는걸 까먹을 수 있기 때문에 미리 작성해둔다.
// defer : defer란, 현재 코드 블록을 나가기 전에 꼭 실행해야 되는 코드를 작성하여 코드가 블록을 어떻게 빠져 나가든 꼭 마무리해야 되는 작업을 할 수 있게 도와줍니다.
// 29 upstream 이 complete 되지 않았을때 값을 전달하도록 한다.
guard completion == nil else { return }
// 30 버퍼에 값을 추가하고 capacity 유지
replay.append(value)
if replay.count > capacity {
replay.removeFirst()
}
// 31 연결된 subscriber에게 값을 전달
subscriptions.forEach {
$0.receive(value)
}
}
private func complete(_ completion: Subscribers.Completion<Failure>) {
lock.lock()
defer { lock.unlock() }
// 32 completion event를 미래 subscriber를 위해 저장한다.
self.completion = completion
// 33 연결된 subscriber에게 값을 전달
subscriptions.forEach {
$0.receive(completion: completion)
}
}
}
}
- Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.
// 1
// Publisher와 Subscriber모두 Subscription에 접근하여 값을 바꿔야 하기 떄문에
// 공통 instance에 접근해야 함으로 제네릭 class 사용하여 subscription을 만든다.
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
// 2 버퍼의 크기
let capacity: Int
// 3 구독하는 동안 subscriber에 대한 레퍼런스를 가지고 있다.
// 3 AnyPublisher로 타입을 삭제했다.
var subscriber: AnySubscriber<Output,Failure>? = nil
// 4 subscriber가 publisher에게 보낸 누적된 demand(요구)
var demand: Subscribers.Demand = .none
// 5 대기중인 값을 저장할 버퍼
var buffer: [Output]
// 6 completion이 일어날 수 있는 가능성을 가지고 있어 새로운 subscriber들 에게 값을 요구하는 동시에 바로 전달할 수 있게 한다.?
var completion: Subscribers.Completion<Failure>? = nil
// 바로 전달할 때 완료 이벤트를 계속 유지할 필요가 없다고 생각한다면 그렇지 않으니 안심하세요.
// 구독자는 먼저 구독을 수신한 다음 값을 수락할 준비가 되자마자 완료 이벤트를 수신해야 합니다.
// 첫 번째 요청(_:)이 이를 신호로 보냅니다.
// 게시자는 이 요청이 언제 발생할지 모르기 때문에 적시에 전달하기 위해 구독에 완료를 전달합니다.
init<S>(subscriber: S, replay: [Output], capacity: Int, completion: Subscribers.Completion<Failure>?) where S: Subscriber, Failure == S.Failure, Output == S.Input {
// 7 타입이 지워진 subscriber를 저장한다.
self.subscriber = AnySubscriber(subscriber)
// 8 upstream publisher의 현재 buffer, capacity, completion event를 저장한다. (방출 될 경우)
self.buffer = replay
self.capacity = capacity
self.completion = completion
}
// completion event를 subscriber에게 전달 해줄 method
private func complete(with completion: Subscribers.Completion<Failure>) {
// 9 함수가 실행되는 동안 subscriber를 가지고 있으면서 class 의 subsscriber는 nil로 만들어
// subscriber가 잘못된 호출을 했을때 completion 이벤트를 무시하도록 한다.
guard let subscriber = subscriber else { return }
self.subscriber = nil
// 10 completion은 한번만 보내지도록 nil로 바꾸고 버퍼도 비워준다.
self.completion = nil
self.buffer.removeAll()
// 11 completion event를 subscriber에게 전달한다.
subscriber.receive(completion: completion)
}
//값을 subscriber에게 방출하는 method
//outstanding value: 갚지 못한 값
private func emitAsNeeded() {
guard let subscriber = subscriber else { return }
// 12 버퍼에 값이 있거나 demand가 있을 경우에만 방출한다.
while self.demand > .none && !buffer.isEmpty {
// 13 방출 후 demand를 낮춘다.
self.demand -= .max(1)
// 14 버퍼의 값 중 첫번째를 보내고 새로운 demand를 받는다.
let nextDemand = subscriber.receive(buffer.removeFirst())
// 15 새로운 demand를 누적시킨다.
if nextDemand != .none { //.none 은 demand += 하면 crash 나니 if 로 걸러준다
self.demand += nextDemand
}
}
// 16 completion 이벤트가 대기중이라면 바로 보낸다.
if let completion = completion {
complete(with: completion)
}
}
func request(_ demand: Subscribers.Demand) {
if demand != .none { //crash 방지를 위해
self.demand += demand
}
emitAsNeeded()
}
func receive(_ input: Output) {
guard subscriber != nil else { return }
// 17 버퍼에 값을 추가한다.
buffer.append(input)
if buffer.count > capacity {
// 18 FIFO 방식으로 capacity만큼 유지한다.
buffer.removeFirst()
}
// 19 subscriber에게 결과를 전달한다.
emitAsNeeded()
}
func receive(completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else { return }
self.subscriber = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
func cancel() {
complete(with: .finished)
}
}
- 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의
extension Publisher {
func shareReplay(capacity: Int = .max) -> Publishers.ShareReplay<Self> {
return Publishers.ShareReplay(upstream: self, capacity: capacity)
}
}
- 실행코드
let publisher = subject
// .print("shareReplay")
.shareReplay(capacity: 2)
// 44 아직 연결된 subscriber가 없어서 output 은 없다.
subject.send(0)
let subscription1 = publisher.sink(
receiveCompletion: {
print("subscription1 completed: \($0)", to: &logger)
},
receiveValue: {
print("subscription1 received \($0)", to: &logger)
}
)
subject.send(1)
subject.send(2)
subject.send(3)
let subscription2 = publisher.sink(
receiveCompletion: {
print("subscription2 completed: \($0)", to: &logger)
},
receiveValue: {
print("subscription2 received \($0)", to: &logger)
}
)
subject.send(4)
subject.send(5)
subject.send(completion: .finished)
var subscription3: Cancellable? = nil
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print("Subscribing to shareReplay after upstream completed")
subscription3 = publisher.sink(
receiveCompletion: {
print("subscription3 completed: \($0)", to: &logger)
},
receiveValue: {
print("subscription3 received \($0)", to: &logger)
}
)
}
//subscription1은 subscribe되어 있기 떄문에 subject가 emit 할때 바로 받는다.
+0.41118s: subscription1 received 1
+0.41234s: subscription1 received 2
+0.41289s: subscription1 received 3
//subscription2는 1,2,3이 방출된 뒤에 구독되었지만 마지막 두개의 값이 버퍼에 저장되어 있어서 2,3 을 받는다.
+0.41367s: subscription2 received 2
+0.41373s: subscription2 received 3
// 이후 4,5가 방출되면 sub1, sub2 둘다 값을 받는다.
+0.41408s: subscription1 received 4
+0.41425s: subscription2 received 4
+0.41482s: subscription1 received 5
+0.41499s: subscription2 received 5
// completion 이벤트도 전달된다.
+0.41703s: subscription1 completed: finished
+0.41720s: subscription2 completed: finished
// 이벤트가 끝난 이후에 subscribe되어도 마지막 2개의 값을 받는다.
//(completion 여부도 저장하고 있어서 complete이벤트 방출된다.)
+1.41976s: subscription3 received 4
+1.41991s: subscription3 received 5
+1.42038s: subscription3 completed: finished
예시3) PausableSink
backpressure(배압) 이해를 위한 예시
배압: 유체역학에서 유체가 흐르는 파이프에서 흐름에 반대되는, 저항하는 힘을 뜻함
combine에서는 publisher로 부터 오는 값에 흐름에 대한 저항
저항이란 -> subscriber가 받은 publisher로 부터온 값들을 처리하는(process)데 걸리는 시간
- input 주기가 굉장히 짧은 데이터 (센서 데이터)
- 대용량 파일 전송
- 데이터 업데이트 이후 복잡한 UI 업데이트
- user input 기다릴때
- [들어오는 데이터 -> sink 내부에서 처리 -> 나가는 데이터] 에서 들어오는 데이터 >> 나가는 데이터 일때
이를 처리하기 위해 Combine 은 pull 디자인 방식을 채택함
- subscriber가 publisher에게 요구하지 않는 한 publisher는 새로운 값을 방출하지 않도록 한다.
- closing the tap -> 수도꼭지처럼 필요할때 열고 필요없으면 잠그는 방식
*Subscriber가 Puslibher에게 할 수 있는 demand (요구) 는 더하는 방향으로만 수정될 수 있다.
새로운 값을 받을 때 마다 demand를 증가시키거나 (.max(N) 이나 .unlimited 를 통해)
증가시키지 않는(.none) 선택 두가지만 할 수 있다.
* 3개의 값을 요구했는데 1개 밖에 받지 못한 경우에는 연결상태를 유지한다. (수도꼭지를 닫지 않는다.)
2개를 받지 못한 상태일 때는 새로 요구를 추가하지 않아도 최대 2개를 받을 수 있다.
만약 요구한 값보다 더 많은 값이 대기하고 있다면 몇가지 방식으로 처리할 수 있다.
- Publisher가 처음부터 Subscriber가 처리할 수 있는 만큼의 값만 방출하도록 한다.
- Subscriber가 처리할 수 있을 때 까지 Buffer에 저장한다.
- Subscriber가 감당할 수 없는 값은 버린다.
- 위 3개를 적절히 섞는다.
배압을 처리하는 방식은 크게 두가지 형태를 가진다.
- Publisher에서 처리: custom Subscription을 통해 혼잡을 해결하는 Publisher를 만든다.
- Subscriber에서 처리: publisher 고리의 마지막에서 값을 전달하는 subscriber를 만든다. (sink)
subscriber도 결국 operator고 sink 도 결국 operator이고 sink도 subscriber라고 볼 수 있다.
값을 만들어 내는 Operator로 Publisher라고 할 수 있을까?
예제코드 (pausableSink)
import Combine
import Foundation
protocol Pausable {
var paused: Bool { get }
func resume()
}
// 1 Pausable, Canccellable 프로토콜을 준수한ㄴ Subscriber를 만든다.
// pasuableSink function이 반환할 객체이다.
// 객체가 복사되지 않아야 하기 때문에 reference 타입으로 class 로 선언한다.
final class PausableSubscriber<Input, Failure: Error>: Subscriber, Pausable, Cancellable {
// 2 subscriber는 반드시 unique identifier를 프레임워크에 제공해야한다.
let combineIdentifier = CombineIdentifier()
// 3 true:값을 더 받는다. false: pause subscription
let receiveValue: (Input) -> Bool
// 4 publisher로 부터 completion event를 받았을 때 아래의 클로져가 호출된다.
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
// 5 일시 정지된 다음 다시 값을 요청할 수 있도록 레퍼런스를 저장해둔다.
// retain cycle 때문에 필요하지 않을떄 nil로 선언한다.
private var subscription: Subscription? = nil
// 6 Pausable protocol 필요한 변수
var paused = false
// 7 publisher로부터 값을 받을때와 complete 이벤트를 받을 때 실행될 클로져 두개를 받는다.
// 기존의 sink 와 비슷한데 한가지 차이점은 receiveValue 클로져는 Bool 을 반환한다는 것이다.
init(receiveValue: @escaping (Input) -> Bool,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}
// 8
func cancel() {
subscription?.cancel()
subscription = nil
}
func receive(subscription: Subscription) {
// 9 추후에 일시정지하거나 재개할때 필요하기 때문에 subscription을 저장한다.
self.subscription = subscription
// 10 바로 하나의 값을 요청한다.
// subscriber는 일시정지가 가능하고 언제 정지될 지 모르기 때문에 값하나를 요청한다.
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
// 11 값을 받고 pause 상태를 업데이트 한다.
paused = receiveValue(input) == false
// 12 멈췄을 떄는 demand를 .none으로 해서 값이 더이상 필요없다고 알린다.
return paused ? .none : .max(1)
}
func receive(completion: Subscribers.Completion<Failure>) {
// 13 completion event를 받았을 때 receiveCompletion으로 포워딩 한다.
//그리고 필요없으니 subscription을 nil로 만든다.
receiveCompletion(completion)
subscription = nil
}
func resume() {
guard paused else { return }
paused = false
// 14 publisher가 멈춘 상태라면 값을 요청하여 사이클이 돌도록 한다.
subscription?.request(.max(1))
}
}
extension Publisher {
// 15 Bool을 리턴하는것 빼면 sink operator와 비슷하다.
func pausableSink( receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Output) -> Bool)) -> Pausable & Cancellable {
// 16 새로운 PausableSubscriber를 만들어 publisher(self)에 subsscribe한다.
let pausable = PausableSubscriber(
receiveValue: receiveValue,
receiveCompletion: receiveCompletion)
self.subscribe(pausable)
// 17
return pausable
}
}
//MARK: example code
let subscription = [1, 2, 3, 4, 5, 6]
.publisher
.pausableSink(receiveCompletion: { completion in
print("Pausable subscription completed: \(completion)")
}) { value -> Bool in
print("Receive value: \(value)")
if value % 2 == 1 {
print("Pausing")
return false
}
return true
}
let timer = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.sink { _ in
guard subscription.paused else { return }
print("Subscription is paused, resuming")
subscription.resume()
}
Publisher vs Publishers
Reference
Combine_Asynchronous_Programming_with_Swift_v2.0.0