iOS/Combine

#2 Publishers & Subscribers

HaningYa 2020. 10. 27. 14:36
728x90

[출처:www.raywenderlich.com/books/combine-asynchronous-programming-with-swift/v2.0]

목차

  • Publisher
  • Subscriber
  • Cancellable
  • 어떻게 동작하는지
  • custom subscriber 만들기
  • Future
  • Subject
  • Dynamically adjusting demand
  • type erasure
  • Challenge
  • Keypoint

Getting started

playground starter project 설명

Hello publisher

Combine 의 중심에는 publisher protocol 이 있다.
이 프로토콜은 하나 이상의 subscriber 들에게 연속되는 값을 계속해서 전송할 수 있는 타입이 필요로 하는 것들을 정의하고 있다.
다르게 말하지만 publisher는 값을 관심있는 애들한테 publish 또는 emit 할 수 있다.

NotificationCenter 과 비슷하다는걸 느낄 수 가 있는데 NotificationCenter 또한 broadcasted notification을 publish 할 수 있는 publisher 라는 메서드를 가지고 있다. 

example(of: "Publisher") {
  // 1
let myNotification = Notification.Name("MyNotification")
// 2
let publisher = NotificationCenter.default .publisher(for: myNotification, object: nil)
}
  • notification name 을 정해주고
  • NotificationCenter 의 default center 에 접근해 publisher메소드를 부리고 return value 를 지역 상수에 할당해준다.

notificationcenter 의 publisher 메소드의 설명을 보면 Publisher 타입을 리턴한다.

Notification center 는 publisher 없이도 이벤트를 전달할 수 있는데 왜 publisher 를 사용하는가?

이것을 구시대와 신시대를 연결해주는 다리라고 생각하면 된다. (기존에 있는 NotificationCenter API 를 Combine화 하는 방법)

publisher 는 2개의 이벤트를 emit 하는데

  • value (element)
  • completion event

publisher는 0 또는 그 이상 값을 emit 할 수 있지만 completion handler(noraml completion, event, error)는 한번만 emit 가능하다.

한번 publisher 가 completion event 를 emit 하면 그것은 끝난 것이고 추가적인 이벤트를 emit 하지 않는다.

더 깊이 배워보기 전에 예전 방식으로 NotificationCenter API 를 활용해 observer를 달아서 노티를 받아보자.
그리고 observer를 떼서 그만 받아보자

// 3
let center = NotificationCenter.default
// 4
let observer = center.addObserver( forName: myNotification,object: nil, queue: nil) { notification in
    print("Notification received!")
}
// 5
center.post(name: myNotification, object: nil)
// 6
center.removeObserver(observer)
  • center 변수에 default notification 할당
  • notification을 듣고있는 Observer를 만든다. (notification 은 위에서 만든 myNotification 사용)
  • 해당 이름으로 notification 을 post 한다.
  • notification 을 remove 한다.
 ——— Example of: Publisher ———
Notification received!

콘솔에 이렇게 나타나는데 약간 헷갈릴 수 도 있는데 사실 publisher 오는 결과 아니기 때문이다. 그러기 위해서 subscriber가 있어야 한다.

Hello Subscriber

Subscriber 프로토콜은 publisher로 부터 input을 받을 수 있는 type 에 대한 요구사항을 정의하고있다.
조금있다가 Publisher 와 Subscriber protocol 을 준수하는게 무엇인지 깊게 배워볼 것이다.
일단 지금은 기보적인것 만 보자면

example(of: "Subscriber") {
	let myNotification = Notification.Name("MyNotification")
	let publisher = NotificationCenter.default.publisher(for: myNotification, object: nil)
	let center = NotificationCenter.default 
}

만약 notification을 post 했다면 publisher는 emit 하지 않고 그게 중요한 차별점이다.
publisher는 최소 한개의 subscriber 가 있을때만 emit 한다는 점이다.

Subscribing with sink(_:_:)

이전 코드에 이 코드를 더한다.

// 1
let subscription = publisher.sink { _ in
    print("Notification received from a publisher!")
  }
// 2
center.post(name: myNotification, object: nil) 
// 3
subscription.cancel()
  • publisher 에 .sink를 호출해 subscription을 만든다.
  • notification 을 post 한다.
  • subscription을 cancel 한다.

sink는 잠기는 느낌이 아닌 subscriber를 closure 와 함께 붙여 publisher 의 output을 처리하는 간단한 방법이다.
이 예제에서는 closure는 무시하고 대신 message를 출력해여 notification 이 received되었다는 것만 표시한다.

   ——— Example of: Publisher ———
Notification received from a publisher!

라고 console 에 뜰 것 이다.

sink operator 는 계속해서 publisher 가 emit 하는 많은 값을 수신한다.
이것은 unlimited demand 라고 알려져있는데 다음번에 배울 것 이다.
sink operator 는 두개의 클로져를 제공한다.

  • receiving a completion event
  • handle receiving values

어떻게 동작하는지 보려면 해당 코드를 밑에 추가해라

example(of: "Just") {
  // 1
  let just = Just("Hello world!")
// 2
_ = just .sink(
      receiveCompletion: {
        print("Received completion", $0)
      },
      receiveValue: {
        print("Received value", $0)
    })
}

여기서 너는

  1. 원시 값타입으로 부터 Just를 사용하려 publisher를 만들었다.
  2. publisher 에 대한 subscription을 만들어 이벤트에 따라 받은 message를 출력했다.

실행해보면 console 에는 아래와 같이 출력된다.

 ——— Example of: Just ———
Received value Hello world!
Received completion finished

Just의 설명을 살펴보면 해당 결과값을 각각의 subscriber 에게 한번만 전달하고 끝나는 Publisher 라도 되있다.

다른 subscriber를 붙여보자

_ = just .sink(
    receiveCompletion: {
      print("Received completion (another)", $0)
    },
    receiveValue: {
      print("Received value (another)", $0)
  })

실행 시키면 Just Publisher 가 새로운 subscriber에게 딱 한번만 emit 하고 끝나는 걸 볼 수 있다.

Received value (another) Hello world!
Received completion (another) finished

Subscribing with assign(to:on:)

sink에 추가적으로 assign(to:on:) Operator 는 수신된 값을 KVO-compliant property of object 에게 assign 할 수 있도록 한다.

예를들어

example(of: "assign(to:on:)") {
  // 1
  class SomeObject {
    var value: String = "" {
      didSet {
        print(value)
      }
   } 
}
// 2
  let object = SomeObject()
  
// 3
  let publisher = ["Hello", "world!"].publisher

// 4
  _ = publisher
	  .assign(to: \.value, on: object)
}
  1. 새로운 값을 출력하는 didSet property observer를 가진 class 를 정의한다.
  2. class 의 인스턴스를 만든다.
  3. 문자열 배열에서 publisher를 만든다.
  4. publisher를 subscribe 해서 수신된 각각의 값을 객체의 value 프로퍼티에 할당한다.

실행시켜 보면 

 ——— Example of: assign(to:on:) ———
Hello
world!

일단 sink operator 에 집중하지만 8장에서 assign 에 대해 알아 볼 것 이다.

Hello Cancellable

subscriber가 할일을 다하고 더이상 publisher 로 부터 값을 받지 않는 경우 subscritpion을 cancel하여 사용하고있는 리소스나 네트워크 콜과 같은 활동들을 멈추어 리소스를 해제하는게 좋은 생각이다.

subscriptions 는 AnyCancellable 인스턴스를 cancellation token 처럼 리턴한다. (작업이 끝나고 subscription을 cancel 할 수 있는)

AnyCancellable 프로토콜은 Cancellable 프로토콜을준수하는데 cancel() 메소드를 딱 아까 말했던 목적때문에 가지고 있다. 

Subscriber example 에서 subscription을 cancel 해보자

// 1
center.post(name: myNotification, object: nil)
// 2
subscription.cancel()
  1. notification 을 post 한다.
  2. subscription은 cancel 한다. subscription에서 cancel() 메서드를 호출할 수 있다. 왜냐하면 Subscription protocol 은 Cancellable 를 상속?받기 때문이다.

실행해보면 

 ——— Example of: Subscriber ———
Notification received from a publisher!

만약 명시적으로 cancel()을 호출하지 않으면 publisher 가 complete 할 때 까지 계속되거나 일반적인 메모리 관리자가 저장된 subscription을 내리기 전까지 계속된다. 어느 순간에는 subscription을 cancel 해줄 것 이다.

*subscription에서 온 return value를 무시하는 것도 좋지만 하나 예고하자만 subscription을 전체 프로젝트에 저장하지 않으면 그 subscription은 프로그램 flow 가 scope 를 벗어나는 즉시 cancel 될 것 이다.

이제 이런 기능이 어떻게 작동되는지 알아보자

Understanding what's going on

UML(unified modeling language) 그림으로 보면

https://koenig-media.raywenderlich.com/uploads/2020/01/Publisher-Subscriber.png

  1. subscriber 는 publisher를 구독한다.
  2. publisher 는 subscription을 만들어 subscriber 에게 전달한다.
  3. subscriber 가 값을 요청한다.
  4. publisher 가 값을 전송한다.
  5. publisher 가 completion을 전송한다.

*해당 내용의 세부 내용은 18장에서 배우게 된다.

Publisher protocol 과 중요한 extension들을 보면

public protocol Publisher {
// 1
  associatedtype Output
// 2
  associatedtype Failure : Error
// 4
  func receive<S>(subscriber: S) 
    where S: Subscriber, 
    Self.Failure == S.Failure, 
    Self.Output == S.Input
}
extension Publisher {
// 3
  public func subscribe<S>(_ subscriber: S) 
    where S : Subscriber,
    Self.Failure == S.Failure,
    Self.Output == S.Input
}
  1. publisher 가 만들 수 있는 값의 타입
  2. publisher 가 만들 수 있는 error 타입 또는 에러가 없는게 보장될 경우 Never 할당
  3. subscriber는 publisher 와 붙을 수 있게 subscribe(_:) 함수를 호출
  4. subscribe(_:)의 구현은 receive(subscriber:)를 호출하여 subscriber 를 publisher 에게 연동시킴(creation of subscription)

associated types 는 publisher interface 로 subscriber subscription을 만들기 위해 무조건 일치시켜야 한다.

이제 Subscriber protocol 을 보면

public protocol Subscriber: CustomCombineIdentifierConvertible {
// 1
  associatedtype Input
// 2
  associatedtype Failure: Error
// 3
  func receive(subscription: Subscription)
// 4
  func receive(_ input: Self.Input) -> Subscribers.Demand
// 5
  func receive(completion: Subscribers.Completion<Self.Failure>) 
}
  1. subscriber 가 수신할 수 있는 값 타입
  2. subscriber 가 수신할 수 있는 error 타입, 에러를 받는 경우가 없는게 보장되면 Never 할당
  3. publisher는 subscriber 에게 subscription을 주기 위해 receive(subscription:) 을 호출한다.
  4. publisher 는 subscriber의 receive(_:)를 호출해 방금 published 된 새로운 값을 subscriber 에게 전달
  5. publisher 는 subscriber 의 receive(completion:) 를 호출해 값을 만드는게 끝났거나 에러가 발생했다고 알림

publisher 와 subscriber 의 연결을 subscription이라고 한다. 여기서 Subscription 프로토콜을 보면

public protocol Subscription: Cancellable,
  CustomCombineIdentifierConvertible {
    func request(_ demand: Subscribers.Demand) 
}

subscriber는 request(_:)를 호출해 값을 최대 무한대의 값을 더 받고 싶음을 나타낸다.


* subscriber 가 얼마나 많은 갑을 받고싶은지에 대해 말하는 개념은 backpressure management 라고 불린다.

**Backpressure is when the progress of turning that input to output is resisted in some way (배압)

컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다.

 

용어집 - 리액티브 선언문

용어집 비동기 옥스퍼드 사전은 비동기를 _"동시에 존재하거나 발생하지 않는"_ 이라고 정의한다. 이 선언문의 문맥에서는 클라이언트에서 서비스로 전송된 요청이 이후 임의의 시점에 처리된

www.reactivemanifesto.org


Creating custom subscriber

코드를 작성한다.

example(of: "Custom Subscriber") {
// 1
  let publisher = (1...6).publisher
// 2
  final class IntSubscriber: Subscriber {
// 3
  typealias Input = Int
  typealias Failure = Never
// 4
  func receive(subscription: Subscription) {
    subscription.request(.max(3)) 
  }
// 5
  func receive(_ input: Int) -> Subscribers.Demand { 
    print("Received value", input)
    return .none 
  }
// 6
  func receive(completion: Subscribers.Completion<Never>) {
      print("Received completion", completion)
    }
  } 
}
  1. 정수들 값을 범위로 만들어 publisher 로 만든다.
  2. IntSubscriber라는 custom subscriber 를 만든다.
  3. 타입 별명을 붙여서 이 subscriber는 오직 정수 입력만 받을 수 있고 절대 에러가 발생하지 않는다고 명시한다.
  4. 필요한 메소드를 구현한다.
    • receive(subscription:) : publisher 에게 호출된다.
      그리고 해당 메소드 안에 .request(_:)를 호출해 subscriber가 subcription을 통해 최대 3개의 값을 받을 수 있게 해준다.
  5. 수신될때 각각의 값을 출력하고 .none을 리턴하여 demand를 수정하지 않를 것을 표시한다. .none은 .max(0)과 같다.
  6. completion event 를 출력한다. 

publisher 가 어떤걸 publish 하던 subscriber가 있어야 하니까 추가한다.

 let subscriber = IntSubscriber() 
 publisher.subscribe(subscriber)

이 코드에서는 출력값의 타입과 Failure 타입이 같은 subscriber를 만들고 publisher 에게 subscrib 하라고 한다.

실행해보면

——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3

completion event 를 받지 못한 걸 볼수 있다. 왜냐하면 publisher 는 최대 3개만 값을 받을 수 있기 때문이다.

receive 에서 .none 을 .unlimited로 바꾸면 모든값이 다 출력되고 completion event 도 출력되는걸 볼 수 있다.

func receive(_ input: Int) -> Subscribers.Demand { 
  print("Received value", input)
  return .unlimited 
}
——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished

.unlimited를 .max(1)로 바꾸고 다시 실행시켜 보면 .unlimied와 같은 결과가 출력되는걸 볼 수 있는데 왜냐하면 각각 이벤트를 받을 때 마다 1씩 max를 증가시키고 있기 때문이다.

다시 .max(1)를 .none으로 바꾸고 publisher 정의를 문자의 배열로 바꿔본다.

let publisher = ["A", "B", "C", "D", "E", "F"].publisher

실행시켜보면 subscribe 메소드에서 error가 발생하는데 publisher 와 subscriber 의 Input 과 Failure type 이 맞지않기 때문이다.

publisher를 다시 원래대로 정수 범위로 바꿔줘 error 를 해결한다. 

Hello Future

Just를 사용해서 하나의 값만 emit 하는 publisher 를 만들어 본것 처럼 Future 도 비동기적으로 하나의 결과값을 만들고 complete 한다.

example(of: "Future") {
  func futureIncrement(
    integer: Int,
    afterDelay delay: TimeInterval) -> Future<Int, Never> {
  } 
}

여기서 factory cuntion을 만들어 Int와 Never 타입을 가지는 future을 반환한다. 

또한 subscriptions 집합을 추가해 future에 subscrition들을 저장할 것 이다. 

길게 동작하는 비동기 작업들은 subscription을 저장하지 않으면 현재 code 의 scope 가 끝날 때  cancelation을 초래한다.
(플레이그라운드에 경우 바로 cancelation이 발생한다.)

다음으로 함수 body를 채워 future 을 만든다.

Future<Int, Never> { promise in 
  DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
    promise(.success(integer + 1)) 
  }
}

이 코드는 fure를 정의해서 caller 에 의해 명시된 값을 사용해 integer 를 delay 이후에 증가시킬 것이라는 promise 를 만든다.

Future는 결국 single value 를 만들고 끝나는 Pulblisher 이다.
그리고 값이나 에러가 가능해 졌을 때 closure 를 invoke 하고 해당 invoked 된 closure 는 promise 로 언급된다.

Future 의 정의를 보자면

final public class Future<Output, Failure> : Publisher
  where Failure: Error {
  public typealias Promise = (Result<Output, Failure>) -> Void ...
}

Promise 는 Future 에 의해 published 된 single value 또는 error 를 가지는 Result 를 수신하는 클로져의 별명이다.

futureincrement 정의에 코드를 추가해본다.

// 1
let future = futureIncrement(integer: 1, afterDelay: 3)
// 2
future
  .sink(receiveCompletion: { print($0) },
    receiveValue: { print($0) }) 
  .store(in: &subscriptions)
  1. 이전에 만든 factory function을 사용하여 future을 만들고 3초 뒤에 전달한 정수값을 증가시키라고 적어놨다.
  2. subscribe 하고 값과 completion event 를 수신하고 resulting subscription을 subscriptions set 에 저장한다.
    이 챕터 뒤에서 subscriptions을 collection에 저장하는걸 배우게 된다.

실행시켜 보면 다음과 같이 콘솔에 찍힌다.

 ——— Example of: Future ———
2
finished

두번째 subscription을 future에 더해라

future
  .sink(receiveCompletion: { print("Second", $0) },
    receiveValue: { print("Second", $0) }) 
  .store(in: &subscriptions)

playground를 실행하기전에 다음 출력문을 futureIncrement 함수의 DisPatchQueue블록에 넣어라

print("Original")

실행시켜 보면 특정 delay 이후 두번째 subscription이 같은 값을 수신하는걸 알 수 있다. 
future은 promise 를 다시 실행시킨게 아니라 결과값을 share, replay 한 것 이다.

——— Example of: Future ———
Original
2
finished
Second 2
Second finished

Original 출력은 subscription이 일어난 다음 바로 출력된다.
기서은 future 가 만들어진 직후 바로 실행하기 때문이다.
일반적인 publisher 와는 다르게 subscriber가 필요없다.

앞서 예제들로 제한된 갯수만큼 Publish 되는 값에 대해서 작업해 보았다.

notification center 예제는 publisher 의 예시로써 무제한으로 아래의 2개를 제공하며 비동기적으로 value 를 publish 할 수 있었다.

  1. notification sender 가 notifications 를 emit 한다.
  2. 특정 notification 에 대한 구체적인 subscriber 가 있다.

위 작업을 내 코드로 똑같이 해볼 수 있는 방법인 Subject 에 대해서 알아본다.

Hello Subject

publisher 와 subscriber와 custom subscriber를 만드는 방법까지 공부해 봤다면 이후에는 custom publisher를 만드는 방법을 배우게 될 것 이다. 

일단 지금은 subject 에 대해서 배워보자

subject는 combine 이 아닌 코드에서 combine subscriber 에게  value 를 전송할 수 있게 해준다.

example(of: "PassthroughSubject") {
  // 1
  enum MyError: Error {
    case test 
  }
  // 2
  final class StringSubscriber: Subscriber {
    typealias Input = String
    typealias Failure = MyError
    
    func receive(subscription: Subscription) {
      subscription.request(.max(2)) 
    }
    func receive(_ input: String) -> Subscribers.Demand { 
      print("Received value", input)
      // 3
      return input == "World" ? .max(1) : .none
    }
    func receive(completion: Subscribers.Completion<MyError>) {
      print("Received completion", completion)
    }
  }
  // 4
  let subscriber = StringSubscriber()
}
  • custom error type 을 정의한다.
  • MyError 에러와 strings를 수신하는 custom subscriber를 정의한다.
  • 수신된 값에 대해 요구사항을 반영한다.
  • custom subscriber instance 를 만든다.

입력값이 "World"일 때 .max(1)을 receive(_:) 에 반환하면 새로운 max 는 3으로 된다. (원래 max 값 + 1)

custom error type을 정의하고 요구에 맞게 수신된 값을 바꾸는 것 외엔 새로운 건 없다.

이제 흥미로운 부분이 추가되는데

// 5
let subject = PassthroughSubject<String, MyError>()
// 6
subject.subscribe(subscriber)
// 7
let subscription = subject .sink(
    receiveCompletion: { completion in
      print("Received completion (sink)", completion)
    },
    receiveValue: { value in
      print("Received value (sink)", value)
    }
)
  • String 타입과 custom error type 의 PassthroughSubject 인스턴스를 만든다.
  • subscriber 를 subject 에 subscribes 한다. (구독자를 주제에 구독한다)
  • sink 를 통해 다른 subscription을 만든다.

Passthrough subject 는 새로운 값을 publish 할 수 있게 해준다.

이런 값들과 completion event 를 전달 하는데 어느 publisher 든 먼저 값의 type 과 발생할 수 있는 error 에 대해서 먼저 선언해야 한다.

subscribers는 무조건 input 과 failture type 에 맞춰져야 비로소 passthrough subject를 subscribe할 수 있다.

이제 passthrough subject를 만들었으니 값을 보내보자

 subject.send("Hello") 
 subject.send("World")

이 코드는 한번에 하나씩 2개의 값을 보낸다. 

——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World

각각의 subscriber 는 published 된 값을 수신한다.

subscription.cancel()
subject.send("Still there?")
  • 두번째 subscription 을 cancel 하고 
  • 다른 값을 보낸다.
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?

코드를 더 추가하면

 subject.send(completion: .finished) 
 subject.send("How about another one?")

실행해보면 subscriber는 "How about another one?"는 수신하지 않는다. 왜냐하면 completion event 를 value가 전해진 바로 다음에 받기 때문이다. 첫번째 subscriber는 completion event나 값을 받지 못한다. 왜냐하면 subscription이 이전에 canceled 되었기 때문이다.

 ——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion finished

completion event를 보내기 전 line에 코드를 추가해 보면

subject.send(completion: .failure(MyError.test))
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion failure(...MyError.test)

첫번째 subscriber 에 의해 에러가 받아진다. 하지만 error 이후에 보내진  completion event 는  아니다.

이것은 publisher 가 single completion event를 보냈 을 경우 (에러이든 아니던) 끝나는 것이다. 

PassthroughSubject로 값을 전달하는 방법은 명령형 코드와 선언형 세계를 이어주는 다리 역할인 것 이다.

하지만 가끔은 publisher 가 가진 현재값을 보고싶을 경우가 있다. 그럴 때 CurrentValueSubject를 사용한다.

subscription 각각의 값을 저장하는 대신 AnyCancellable의 콜렉션에 subscription을 담다둘 수 있다.

이 collection은 더해진 subscription들을 자기 자신이 deinit 될때 함께 cancel 시킨다.

example(of: "CurrentValueSubject") {
  // 1
  var subscriptions = Set<AnyCancellable>()
  // 2
  let subject = CurrentValueSubject<Int, Never>(0)
  // 3
  subject
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions) // 4 
 }

 

  • subscriptions set 를 만든다.
  • CurerntValueSubject 를 만든다. (초기값 0)
  • subject를 sink 를 통해 구독하고 값을 출력하게 한다.
  • subscription set 에 subscription을 저장하고 inout 파라미터를 통해 copy 대신 업데이트 되게 한다.
 ——— Example of: CurrentValueSubject ———
0

실행시켜 보면 초기값 0을 출력하는 것을 볼 수 있다.

subject.send(1) 
subject.send(2)

두개를 더 보내면 1,2 가 찍힌다.

passthroughSubject 와는 다르게 current value 를 .value 프로퍼티를 통해 물어볼 수 있다.

subject.value

또한 send(_:)를 통해 새로운 값을 보낼 수 있다. 다른 방법으로는 value 프로퍼티에 그냥 값을 할당하면된다.

subject.value = 3

단 value 프로퍼티에는 .finished 와 같은 event 는 보낼 수 없기에 send(_:) 메소드를 사용해야 한다.

Dynamically adjusting demand

 

728x90