본문 바로가기
iOS/Combine

Creating custom combine Publisher(Operator)

by HaningYa 2021. 10. 4.
728x90

Combine에는 Publisher와 Subscriber라는 개념이 있다.

Combine: framework that let you subscribe to things and get value from things

"Things" = Publisher

  • Publisher <자료형, 에러>
    • 자료형에는 Int, String, 등등
    • 에러에는 아무 Swift Error type
  • Subscriber: 소비자 (value를 받아들일 수 있는것)
    • sink
    • assign

특징
*Publisher는 value 를 바로 emit 하지 않고 subscriber의 demand 가 있어야 emit 한다.

일련의 동작과정

  1. Subscriber -- (subscribe) --> Publisher
  2. Publisher 에서 receive(subscribe:) 메서드가 실행
  3. 그럼 Publisher는 subscription을 만든다.
    *subscription은 특정 subscriber와 publisher의 connection을 의미한다.
  4. subscription과 subscriber가 서로 대화하기 시작한다. (Publisher는 빠진다.) 
    Subscription --(backPressure)--> Subscriber
    *backpressure: subscriber가 subscription에게 value를 demand해야 subscription은 값을 줄 수 있다.
    값을 준 다음 subscriber는 demand를 update 한다.
  5. Subscription은 cancel 메서드로 취소될 수 있다.

Operator의 정의

Publisher -- 1,2,3 --> map {$0 + 1} --> 2, 3, 4 (effected publisher)
Upstream ---------> operator -------->downstream

Custom operator를 만드는 두가지 방법

1. 기존에 있는 operator 를 조합해서 사용
2. custom operator를 만드는 것 (backPressure를 관리해야되서 까다로움)
ㄴsubscription과 subscriber 사이 demand를 관리해 줘야함

* DisposeBag과 Cancellables의 차이점
- bag는 deallocated됬을때 clear되는데 cancel되진 않는다. combine에서는 subscription이 deinit되면 모든 작업이 cancel된다.

1. 기존의 operator를 조합해서 사용

예시1) Bool 값을 emit 하는 Publisher에 대한 toggle 기능

// 기존에 제공하는 map을 사용하여 toggle operator 만들기
public extension Publisher where Output == Bool {
//Bool 값을 emit하는 모든 publisher에 사용 가능
	func toggle() -> Publisher.Map<Self,Bool> {
    	map(!) 
    }
}

// operator test code
class BasicOperatorTests: XCTestCase {
	var subscription: AnyCancellable!
    
	override func tearDown() {
		subscription = nil
	}
    
	func testToggle() {
		let source = [false, true, true, false].publisher
		var vales = [Bool]()
		subscription = source
			.toggle()
			.sink(receiveValue: { values.append($0) })
            
		XCTAssertEqual(values, [true, false, false, true])
    }
}

예시2) optional 값을 emit하는 publisher에 대해 unwrap 기능

extension Publisher {
    func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
        compactMap { $0 }
    }
}

let values: [Int?] = [1, 2, nil, 3, nil, 4]

values.publisher
    .unwrap()
    .sink {
        print("Received value: \($0)")
    }


class MyTestCase: XCTestCase {

    var subscription: AnyCancellable!

    override func tearDown() {
        subscription = nil
    }

    func testUnwrap() {
        let source = [1, 2, nil, 3, nil, 4].publisher
        var values: [Int] = []
        subscription = source
            .unwrap()
            .sink(receiveValue: { values.append($0) })
        XCTAssertEqual(values, [1, 2, 3, 4])
    }
}

MyTestCase.defaultTestSuite.run()

예시3) 2개씩 pair 하는 기능

//upstream   [1, 2, 3, 4, 5]
//downstream [1, 2] [2, 3] [3, 4] [4, 5]
public extension Publisher {
    func pair() -> AnyPublisher<[Output], Failure> {
    	scan([]) { buffer, value in 
        	Array((buffer + [value]).suffix(2))
        }
        .filter { $0.count == 2 }
        .eraseToAnyPublisher() 
        //implementation detail을 consumer가 알필요 없음 erase를 통해 AnyPublisher를 반환
    } 
}


// operator test code
class BasicOperatorTests: XCTestCase {
	var subscription: AnyCancellable!
    
	override func tearDown() {
		subscription = nil
	}
    
	func testPairs() {
		let source = (1...5).publisher
		var values = [[Int]]()
    
		subscription = source
			.pair()
			.sink(receiveValue: { values.append($0) })
		XCTAssertEqual(values, [[1,2], [2,3], [3,4], [4,5]])
	}
}

기존에 combine 에서 제공하는 operator를 가지고 composition 하는 방식이라 어렵지 않음.

따로 subscription을 만들 필요가 없고 Publisher namespace 의 extension에 where 절을 통해 Output 조건을 달아주고
원하는 기능의 메서드를 달아주면됨

implementation detail을 가려주기 위해 eraseToAnyPublisher 달아줌

2. custom operator를 만드는 것

    1. Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.
      (DispatchTimer: Publisher)
      - Output 과 Failure 의 타입을 지정해준다.
      - func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, DispatchTime == S.Input 작성
          ㄴ subscriber가 publisher에게 붙었을때 receive 메서드가 실행되면서 subscription을 반환해야함
          ㄴ 여기서 custom subscription이 필요함

    2. Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.
      (DispatchTimerSubscription<S: Subscriber>: Subscription where S.Input == DispatchTime
      - private final class CustomSubscription<S: Subscriber>: Subscription where S.Input == YOURTYPE
          ㄴ 외부 접근 필요없기 때문에 private으로 하고 상속받을 게 없기 때문에 final 로 선언
      - func request(_ demand: Subscribers.Demand)
          ㄴ subscriber와의 관계정의, demand 처리
      -
      func cancel()
          ㄴ cancel 되었을때 deinit 처리
    3. 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의
      (static func(_ , _ ,...) -> Publisher.DispatchTimer(...)

예시1) DispatchTimer publisher
값을 만들어서 emit 하는 publisher 예시

- DispatchTimer에 필요한 설정값을 담을 구조체 선언

struct DispatchTimerConfiguration {
    let queue: DispatchQueue?		//어떤 특정 queue를 사용할건지
    let interval: DispatchTimeInterval	//타이머의 주기
    let leeway: DispatchTimeInterval	//여지, deadline 이후 늦어질 수 있는 최대 시간
    let times: Subscribers.Demand	//이벤트를 몇번 받을 것 인지
}

- Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.

extension Publishers {
    struct DispatchTimer: Publisher {
	//Output 과 Failure 의 타입을 지정해준다.
        typealias Output = DispatchTime // 현재 시각을 DispatchTime값을 방출하게된다.
        typealias Failure = Never	//실패할 경우가 없다.
	//타이머 세팅에 필요한 설정값을 가진다.
        let configuration: DispatchTimerConfiguration

        init(configuration: DispatchTimerConfiguration) {
            self.configuration = configuration
        }
	//subscriber가 publisher에게 subscribe 했을 때 호출될 함수
     //subscription을 통해 내부 로직이 진행된다.
        func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            //DispatchTimerSubscription은 우리가 구현할 custom Subscription 클래스
            let subscription = DispatchTimerSubscription(
                subscriber: subscriber,
                configuration: configuration
            )
            subscriber.receive(subscription: subscription)
        }
    }
}

- Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.

//외부에서 사용하지 않아 private 으로 선언
//Subscriber에 의해 참조되므로 class 로 선언

private final class DispatchTimerSubscription<S: Subscriber>: Subscription where S.Input == DispatchTime {
    let configuration: DispatchTimerConfiguration
    var times: Subscribers.Demand
    var requested: Subscribers.Demand = .none
    var source: DispatchSourceTimer? = nil
    var subscriber: S?

    init(subscriber: S, configuration: DispatchTimerConfiguration) {
        self.configuration = configuration
        self.subscriber = subscriber
        self.times = configuration.times
    }

//subscriber와의 관계를 정의한다.
//demand는 acclumulative(누적된다) -> subscriber가 요청한 값이 누적된다.
    func request(_ demand: Subscribers.Demand) {
    	
        //subscriber에게 configuration에 설정된 값만큼 이미 전달했는지 확인한다.
        //즉, Publisher가 받은 demand와 상관없이 예상 값의 최대 수를 보낸 경우 finish로 종료
        guard times > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
	//새로운 demand를 추가해 requested value를 증가시킴
        requested += demand
        
	//timer가 없다면 새로 만들고 있다면 타이머를 시작시킴
        if source == nil, requested > .none {
        //configuration에 명시된 queue에 따라 DispatchSourceTimer를 생성함
            let source = DispatchSource.makeTimerSource(queue: configuration.queue)
            //configuration에 명시된 스펙대로 타이머를 세팅
            source.schedule(deadline: .now() + configuration.interval,
                            repeating: configuration.interval,
                            leeway: configuration.leeway)
        ///타이머가 한번 시작되면 subscriber에게 값을 방출하지 않아도 멈추지 않음
        //멈추려면 subscription을 cancel하거나 deallocate 시켜야함
        
		//timer가 동작할 때 마다 호출되는 클로져
            source.setEventHandler { [weak self] in\
            //demand가 있는지 확인
                guard let self = self, self.requested > .none else { return }
		//값을 방출하기 전 requested와 times를 각각 감소시킴
                self.requested -= .max(1)
                self.times -= .max(1)
                //subscriber에게 값을 보냄
                _ = self.subscriber?.receive(.now())
                //configuration에 나와있는데로 값을 전부 방출했다면 finish 이벤트 전달
                if self.times == .none {
                    self.subscriber?.receive(completion: .finished)
                }
            }
	//타이머 세팅을 끝났으니 activate
            self.source = source
            source.activate()
        }
    }

    func cancel() {
        source = nil
        subscriber = nil
    }
}

* 왜 demand decrease 시킬때 .max가 붙지?

- 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의

extension Publishers {
    static func timer(queue: DispatchQueue? = nil,
                      interval: DispatchTimeInterval,
                      leeway: DispatchTimeInterval = .nanoseconds(0),
                      times: Subscribers.Demand = .unlimited)
    -> Publishers.DispatchTimer {
        return Publishers.DispatchTimer(
            configuration: .init(queue: queue,
                                 interval: interval,
                                 leeway: leeway,
                                 times: times)
        )
    }
}

- 사용 코드 예시

var logger = TimeLogger(sinceOrigin: true)
let publisher = Publishers.timer(interval: .seconds(1),
                                 times: .max(6))

let subscription = publisher.sink { time in
    print("Timer emits: \(time)", to: &logger)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
    subscription.cancel()
}

//6번 timer가 동작하도록 되있지만 3.5초 이후 cancel하기 때문에 output은 3개만 나올것이다.

 

- 테스트 코드

class MyTestCase: XCTestCase {

    var subscription: AnyCancellable!

    override func tearDown() {
        //timer는 바로 tearDown되면 안됨
//        subscription = nil
    }

    func testTimer() {
        let publisher = Publishers.timer(interval: .seconds(1), times: .max(6))
        var values: [UInt64] = []
        var diffs: [UInt64] = []

        print("timer test start")
        subscription = publisher.sink {
            print("sink value \($0.rawValue)")
            values.append($0.rawValue)
        }
		//5초까지 값을 방출한 뒤 subscription nil을 통해 cancel
        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
            print(values)
            diffs = [values[1] - values[0]
                        , values[2] - values[1]
                        , values[3] - values[2]]
            self.subscription = nil
        }
        
        //Timer가 끝난 후 비교하기 위해 6초뒤 Assert실행
        let result = XCTWaiter.wait(for: [expectation(description: "기다려!")], timeout: 6)
		//DispatchTimer의 rawValue는 nanoseconds 단위 
        //100000000 나노초만큼의 오차범위
        if result == XCTWaiter.Result.timedOut {
            for diff in diffs {
                XCTAssertTrue(diff > 900000000 && diff < 1100000000)
            }
        }else {
            XCTFail("Delay interrupted")
        }

    }
}

MyTestCase.defaultTestSuite.run()

 

예시2) ShareReplay operator
값을 transforming 하는 operator(publisher) 예시

스펙
- 첫번째 subscriber의 upstream publihser를 구독한다.
- 새로운 subscriber에게 마지막 N개의 값을 replay 한다.
- 완료 이벤트가 미리 방출된 경우 릴레이 한다. (릴레이 경주, 이어달리기)

- Publisher namespace 의 extension에 publisher 프로토콜을 준수하는 class or struct를 만든다.
* subscriber가 같은 instance를 공유해야 하니 class로 만든다.

extension Publishers {
    // 20 publisher는 대부분 struct 로 값타입인데 multicast()처럼 class로 만들때도 있다.
    // 하나의 operator instance를 여러 subscriber가 공유해야 되서 class로 만든다.
    final class ShareReplay<Upstream: Publisher>: Publisher {
        // 21 타입을 변경하지는 않으니 Upstream을 따라간다.
        typealias Output = Upstream.Output
        typealias Failure = Upstream.Failure

        // 22 여러개의 subscriber에게 동시에 값을 방출하기 떄문에 lock 이 필요하다.
        private let lock = NSRecursiveLock()
        // 23 upstream publisher를 가지고 있는다. (subscription 생명주기에 필요함)
        private let upstream: Upstream
        // 24
        private let capacity: Int
        // 25
        private var replay = [Output]()
        // 26 각각의 subscriber들은 할당된 ShareReplaySubscription 으로부터 값을 받는다.
        private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
        // 27 operator는 completion이 끝난 다음에도 값을 replay할 수 있기 때문에 complete 되었는지 여부를 기록해 놓는다.
        private var completion: Subscribers.Completion<Failure>? = nil

        init(upstream: Upstream, capacity: Int) {
            self.upstream = upstream
            self.capacity = capacity
        }

        func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {

            lock.lock()
            defer { lock.unlock() }

            // 34 새 subscription은 subscriber를 참조하고 현재의 replay buffer, capacity, completion 이벤트를 가진다.
            let subscription = ShareReplaySubscription(
                subscriber: subscriber,
                replay: replay,
                capacity: capacity,
                completion: completion)

            // 35 subscription을 가지고 있는다. (향후 이벤트 방출을 위해)
            subscriptions.append(subscription)
            // 36 만든 subscription을 subscriber에게 보낸다.
            // 현재든 나중이던 값을 요청할 것 이다.
            subscriber.receive(subscription: subscription)

            // 37 한번만 upstream publisher를 subscribe하도록 한다.
            guard subscriptions.count == 1 else { return }

            let sink = AnySubscriber(
                // 38 closure를 받아서 바로 subscription에 .unlimited 값을 요청하는 AnySubscriber class를 만들어서 publisher가 completion이 되도록 한다.
                receiveSubscription: { subscription in
                    subscription.request(.unlimited)
                },
                // 39 받은 값을 downstream subscriber에게 전달한다.
                receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
                    self?.relay(value)
                    return .none
                },
                // 40 upstream으로부터 받은 completion event로 publisher를 complete 한다.
                receiveCompletion: { [weak self] in
                    self?.complete($0)
                }
            )
            // 바로 complete하기 위해 처음부터 .max(self.capacity) request를 받을 수도 있는데
            // combine 은 demand-driven이기 때문에 publisher가 만들어 낼 수 있는만큼 값을 요청하지 않으면 completion event 를 영원히 받지 못한다.
            upstream.subscribe(sink)
        }

        private func relay(_ value: Output) {
            // 28 여러 subscriber가 이 publisher를 공유하기 때문에 mutable 변수에 대해서 접근을 lock으로 보호해야 한다.
            lock.lock()
            defer { lock.unlock() }
            //굳이 defer을 사용안해도 되지만 unlock 하는걸 까먹을 수 있기 때문에 미리 작성해둔다.
            // defer : defer란, 현재 코드 블록을 나가기 전에 꼭 실행해야 되는 코드를 작성하여 코드가 블록을 어떻게 빠져 나가든 꼭 마무리해야 되는 작업을 할 수 있게 도와줍니다.

            // 29 upstream 이 complete 되지 않았을때 값을 전달하도록 한다.
            guard completion == nil else { return }

            // 30 버퍼에 값을 추가하고 capacity 유지
            replay.append(value)
            if replay.count > capacity {
                replay.removeFirst()
            }
            // 31 연결된 subscriber에게 값을 전달
            subscriptions.forEach {
                $0.receive(value)
            }
        }

        private func complete(_ completion: Subscribers.Completion<Failure>) {
            lock.lock()
            defer { lock.unlock() }
            // 32 completion event를 미래 subscriber를 위해 저장한다.
            self.completion = completion
            // 33 연결된 subscriber에게 값을 전달
            subscriptions.forEach {
                $0.receive(completion: completion)
            }
        }
    }
}

- Subscription 프로토콜을 준수하는 Custom Subscription class를 만든다.

// 1
// Publisher와 Subscriber모두 Subscription에 접근하여 값을 바꿔야 하기 떄문에
// 공통 instance에 접근해야 함으로 제네릭 class 사용하여 subscription을 만든다.
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
    // 2 버퍼의 크기
    let capacity: Int
    // 3 구독하는 동안 subscriber에 대한 레퍼런스를 가지고 있다.
    // 3 AnyPublisher로 타입을 삭제했다.
    var subscriber: AnySubscriber<Output,Failure>? = nil
    // 4 subscriber가 publisher에게 보낸 누적된 demand(요구)
    var demand: Subscribers.Demand = .none
    // 5 대기중인 값을 저장할 버퍼
    var buffer: [Output]
    // 6 completion이 일어날 수 있는 가능성을 가지고 있어 새로운 subscriber들 에게 값을 요구하는 동시에 바로 전달할 수 있게 한다.?
    var completion: Subscribers.Completion<Failure>? = nil
    // 바로 전달할 때 완료 이벤트를 계속 유지할 필요가 없다고 생각한다면 그렇지 않으니 안심하세요.
    // 구독자는 먼저 구독을 수신한 다음 값을 수락할 준비가 되자마자 완료 이벤트를 수신해야 합니다.
    // 첫 번째 요청(_:)이 이를 신호로 보냅니다.
    // 게시자는 이 요청이 언제 발생할지 모르기 때문에 적시에 전달하기 위해 구독에 완료를 전달합니다.

    init<S>(subscriber: S, replay: [Output], capacity: Int, completion: Subscribers.Completion<Failure>?) where S: Subscriber, Failure == S.Failure, Output == S.Input {
        // 7 타입이 지워진 subscriber를 저장한다.
        self.subscriber = AnySubscriber(subscriber)
        // 8 upstream publisher의 현재 buffer, capacity, completion event를 저장한다. (방출 될 경우)
        self.buffer = replay
        self.capacity = capacity
        self.completion = completion
    }

    // completion event를 subscriber에게 전달 해줄 method
    private func complete(with completion: Subscribers.Completion<Failure>) {
        // 9 함수가 실행되는 동안 subscriber를 가지고 있으면서 class 의 subsscriber는 nil로 만들어
        // subscriber가 잘못된 호출을 했을때 completion 이벤트를 무시하도록 한다.
        guard let subscriber = subscriber else { return }
        self.subscriber = nil
        // 10 completion은 한번만 보내지도록 nil로 바꾸고 버퍼도 비워준다.
        self.completion = nil
        self.buffer.removeAll()
        // 11 completion event를 subscriber에게 전달한다.
        subscriber.receive(completion: completion)
    }
    //값을 subscriber에게 방출하는 method
    //outstanding value: 갚지 못한 값
    private func emitAsNeeded() {
        guard let subscriber = subscriber else { return }
        // 12 버퍼에 값이 있거나 demand가 있을 경우에만 방출한다.
        while self.demand > .none && !buffer.isEmpty {
            // 13 방출 후 demand를 낮춘다.
            self.demand -= .max(1)
            // 14 버퍼의 값 중 첫번째를 보내고 새로운 demand를 받는다.
            let nextDemand = subscriber.receive(buffer.removeFirst())
            // 15 새로운 demand를 누적시킨다.
            if nextDemand != .none { //.none 은 demand += 하면 crash 나니 if 로 걸러준다
                self.demand += nextDemand
            }
        }
        // 16 completion 이벤트가 대기중이라면 바로 보낸다.
        if let completion = completion {
            complete(with: completion)
        }
    }

    func request(_ demand: Subscribers.Demand) {
        if demand != .none { //crash 방지를 위해
            self.demand += demand
        }
        emitAsNeeded()
    }

    func receive(_ input: Output) {
        guard subscriber != nil else { return }
        // 17 버퍼에 값을 추가한다.
        buffer.append(input)
        if buffer.count > capacity {
            // 18 FIFO 방식으로 capacity만큼 유지한다.
            buffer.removeFirst()
        }
        // 19 subscriber에게 결과를 전달한다.
        emitAsNeeded()
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        guard let subscriber = subscriber else { return }
        self.subscriber = nil
        self.buffer.removeAll()
        subscriber.receive(completion: completion)
    }

    func cancel() {
        complete(with: .finished)
    }
}

- 1에서 만든 Publisher를 return 하는 메서드를 Publishers 의 extension에 정의

extension Publisher {
    func shareReplay(capacity: Int = .max) -> Publishers.ShareReplay<Self> {
        return Publishers.ShareReplay(upstream: self, capacity: capacity)
    }
}

- 실행코드

let publisher = subject
//    .print("shareReplay")
    .shareReplay(capacity: 2)
// 44 아직 연결된 subscriber가 없어서 output 은 없다.
subject.send(0)

let subscription1 = publisher.sink(
    receiveCompletion: {
        print("subscription1 completed: \($0)", to: &logger)
    },
    receiveValue: {
        print("subscription1 received \($0)", to: &logger)
    }
)

subject.send(1)
subject.send(2)
subject.send(3)

let subscription2 = publisher.sink(
    receiveCompletion: {
        print("subscription2 completed: \($0)", to: &logger)
    },
    receiveValue: {
        print("subscription2 received \($0)", to: &logger)
    }
)

subject.send(4)
subject.send(5)
subject.send(completion: .finished)

var subscription3: Cancellable? = nil

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    print("Subscribing to shareReplay after upstream completed")
    subscription3 = publisher.sink(
        receiveCompletion: {
            print("subscription3 completed: \($0)", to: &logger)
        },
        receiveValue: {
            print("subscription3 received \($0)", to: &logger)
        }
    )
}
//subscription1은 subscribe되어 있기 떄문에 subject가 emit 할때 바로 받는다.
+0.41118s: subscription1 received 1	
+0.41234s: subscription1 received 2
+0.41289s: subscription1 received 3

//subscription2는 1,2,3이 방출된 뒤에 구독되었지만 마지막 두개의 값이 버퍼에 저장되어 있어서 2,3 을 받는다.
+0.41367s: subscription2 received 2	
+0.41373s: subscription2 received 3

// 이후 4,5가 방출되면 sub1, sub2 둘다 값을 받는다.
+0.41408s: subscription1 received 4
+0.41425s: subscription2 received 4
+0.41482s: subscription1 received 5
+0.41499s: subscription2 received 5

// completion 이벤트도 전달된다.
+0.41703s: subscription1 completed: finished
+0.41720s: subscription2 completed: finished

// 이벤트가 끝난 이후에 subscribe되어도 마지막 2개의 값을 받는다. 
//(completion 여부도 저장하고 있어서 complete이벤트 방출된다.)
+1.41976s: subscription3 received 4
+1.41991s: subscription3 received 5
+1.42038s: subscription3 completed: finished

예시3) PausableSink
backpressure(배압) 이해를 위한 예시

배압: 유체역학에서 유체가 흐르는 파이프에서 흐름에 반대되는, 저항하는 힘을 뜻함
combine에서는 publisher로 부터 오는 값에 흐름에 대한 저항

저항이란 -> subscriber가 받은 publisher로 부터온 값들을 처리하는(process)데 걸리는 시간

  • input 주기가 굉장히 짧은 데이터 (센서 데이터)
  • 대용량 파일 전송
  • 데이터 업데이트 이후 복잡한 UI 업데이트
  • user input 기다릴때
  • [들어오는 데이터 -> sink 내부에서 처리 -> 나가는 데이터] 에서 들어오는 데이터 >> 나가는 데이터 일때

이를 처리하기 위해 Combine 은 pull 디자인 방식을 채택함
- subscriber가 publisher에게 요구하지 않는 한 publisher는 새로운 값을 방출하지 않도록 한다.
- closing the tap -> 수도꼭지처럼 필요할때 열고 필요없으면 잠그는 방식

*Subscriber가 Puslibher에게 할 수 있는 demand (요구) 는 더하는 방향으로만 수정될 수 있다.
새로운 값을 받을 때 마다 demand를 증가시키거나 (.max(N) 이나 .unlimited 를 통해)
증가시키지 않는(.none) 선택 두가지만 할 수 있다.

* 3개의 값을 요구했는데 1개 밖에 받지 못한 경우에는 연결상태를 유지한다. (수도꼭지를 닫지 않는다.)
2개를 받지 못한 상태일 때는 새로 요구를 추가하지 않아도 최대 2개를 받을 수 있다.

만약 요구한 값보다 더 많은 값이 대기하고 있다면 몇가지 방식으로 처리할 수 있다.

  1. Publisher가 처음부터 Subscriber가 처리할 수 있는 만큼의 값만 방출하도록 한다.
  2. Subscriber가 처리할 수 있을 때 까지 Buffer에 저장한다.
  3. Subscriber가 감당할 수 없는 값은 버린다.
  4. 위 3개를 적절히 섞는다.

배압을 처리하는 방식은 크게 두가지 형태를 가진다.

  • Publisher에서 처리: custom Subscription을 통해 혼잡을 해결하는 Publisher를 만든다.
  • Subscriber에서 처리: publisher 고리의 마지막에서 값을 전달하는 subscriber를 만든다. (sink)

subscriber도 결국 operator고 sink 도 결국 operator이고 sink도 subscriber라고 볼 수 있다.

값을 만들어 내는 Operator로 Publisher라고 할 수 있을까?

 

 

예제코드 (pausableSink)

더보기
import Combine
import Foundation

protocol Pausable {
  var paused: Bool { get }
  func resume()
}

// 1 Pausable, Canccellable 프로토콜을 준수한ㄴ Subscriber를 만든다.
// pasuableSink function이 반환할 객체이다.
// 객체가 복사되지 않아야 하기 때문에 reference 타입으로 class 로 선언한다.
final class PausableSubscriber<Input, Failure: Error>: Subscriber, Pausable, Cancellable {
  // 2 subscriber는 반드시 unique identifier를 프레임워크에 제공해야한다.
  let combineIdentifier = CombineIdentifier()

  // 3 true:값을 더 받는다. false: pause subscription
  let receiveValue: (Input) -> Bool
  // 4 publisher로 부터 completion event를 받았을 때 아래의 클로져가 호출된다.
  let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

  // 5 일시 정지된 다음 다시 값을 요청할 수 있도록 레퍼런스를 저장해둔다.
    // retain cycle 때문에 필요하지 않을떄 nil로 선언한다.
  private var subscription: Subscription? = nil
  // 6 Pausable protocol 필요한 변수
  var paused = false

  // 7 publisher로부터 값을 받을때와 complete 이벤트를 받을 때 실행될 클로져 두개를 받는다.
    // 기존의 sink 와 비슷한데 한가지 차이점은 receiveValue 클로져는 Bool 을 반환한다는 것이다.
  init(receiveValue: @escaping (Input) -> Bool,
       receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
    self.receiveValue = receiveValue
    self.receiveCompletion = receiveCompletion
  }

  // 8
  func cancel() {
    subscription?.cancel()
    subscription = nil
  }

  func receive(subscription: Subscription) {
    // 9 추후에 일시정지하거나 재개할때 필요하기 때문에 subscription을 저장한다.
    self.subscription = subscription
    // 10 바로 하나의 값을 요청한다.
      // subscriber는 일시정지가 가능하고 언제 정지될 지 모르기 때문에 값하나를 요청한다.
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    // 11 값을 받고 pause 상태를 업데이트 한다.
    paused = receiveValue(input) == false
    // 12 멈췄을 떄는 demand를 .none으로 해서 값이 더이상 필요없다고 알린다.
    return paused ? .none : .max(1)
  }

  func receive(completion: Subscribers.Completion<Failure>) {
    // 13 completion event를 받았을 때 receiveCompletion으로 포워딩 한다.
      //그리고 필요없으니 subscription을 nil로 만든다.
    receiveCompletion(completion)
    subscription = nil
  }

  func resume() {
    guard paused else { return }

    paused = false
    // 14 publisher가 멈춘 상태라면 값을 요청하여 사이클이 돌도록 한다.
    subscription?.request(.max(1))
  }
}

extension Publisher {
  // 15 Bool을 리턴하는것 빼면 sink operator와 비슷하다.
  func pausableSink( receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void), receiveValue: @escaping ((Output) -> Bool)) -> Pausable & Cancellable {
    // 16 새로운 PausableSubscriber를 만들어 publisher(self)에 subsscribe한다.
    let pausable = PausableSubscriber(
      receiveValue: receiveValue,
      receiveCompletion: receiveCompletion)
    self.subscribe(pausable)
    // 17
    return pausable
  }
}


//MARK: example code
let subscription = [1, 2, 3, 4, 5, 6]
  .publisher
  .pausableSink(receiveCompletion: { completion in
    print("Pausable subscription completed: \(completion)")
  }) { value -> Bool in
    print("Receive value: \(value)")
    if value % 2 == 1 {
      print("Pausing")
      return false
    }
    return true
}

let timer = Timer.publish(every: 1, on: .main, in: .common)
  .autoconnect()
  .sink { _ in
    guard subscription.paused else { return }
    print("Subscription is paused, resuming")
    subscription.resume()
  }

 

 

 

 

Publisher vs Publishers

 

 

 

 

Reference

Combine_Asynchronous_Programming_with_Swift_v2.0.0

https://www.youtube.com/watch?v=Vnk4vFyuBGo&t=1044s 

728x90

'iOS > Combine' 카테고리의 다른 글

#18 Custom Publishers & Handling Backpressure  (0) 2020.11.03
#17 Schedulers  (0) 2020.11.03
#Chapter15 In Practice: Combine & SwiftUI  (0) 2020.11.02
#12 Key-Value Observing  (0) 2020.10.30
#9 Networking  (0) 2020.10.28

댓글