#2 Publishers & Subscribers
[출처:www.raywenderlich.com/books/combine-asynchronous-programming-with-swift/v2.0]
목차
- Publisher
- Subscriber
- Cancellable
- 어떻게 동작하는지
- custom subscriber 만들기
- Future
- Subject
- Dynamically adjusting demand
- type erasure
- Challenge
- Keypoint
Getting started
playground starter project 설명
Hello publisher
Combine 의 중심에는 publisher protocol 이 있다.
이 프로토콜은 하나 이상의 subscriber 들에게 연속되는 값을 계속해서 전송할 수 있는 타입이 필요로 하는 것들을 정의하고 있다.
다르게 말하지만 publisher는 값을 관심있는 애들한테 publish 또는 emit 할 수 있다.
NotificationCenter 과 비슷하다는걸 느낄 수 가 있는데 NotificationCenter 또한 broadcasted notification을 publish 할 수 있는 publisher 라는 메서드를 가지고 있다.
example(of: "Publisher") {
// 1
let myNotification = Notification.Name("MyNotification")
// 2
let publisher = NotificationCenter.default .publisher(for: myNotification, object: nil)
}
- notification name 을 정해주고
- NotificationCenter 의 default center 에 접근해 publisher메소드를 부리고 return value 를 지역 상수에 할당해준다.
notificationcenter 의 publisher 메소드의 설명을 보면 Publisher 타입을 리턴한다.
Notification center 는 publisher 없이도 이벤트를 전달할 수 있는데 왜 publisher 를 사용하는가?
이것을 구시대와 신시대를 연결해주는 다리라고 생각하면 된다. (기존에 있는 NotificationCenter API 를 Combine화 하는 방법)
publisher 는 2개의 이벤트를 emit 하는데
- value (element)
- completion event
publisher는 0 또는 그 이상 값을 emit 할 수 있지만 completion handler(noraml completion, event, error)는 한번만 emit 가능하다.
한번 publisher 가 completion event 를 emit 하면 그것은 끝난 것이고 추가적인 이벤트를 emit 하지 않는다.
더 깊이 배워보기 전에 예전 방식으로 NotificationCenter API 를 활용해 observer를 달아서 노티를 받아보자.
그리고 observer를 떼서 그만 받아보자
// 3
let center = NotificationCenter.default
// 4
let observer = center.addObserver( forName: myNotification,object: nil, queue: nil) { notification in
print("Notification received!")
}
// 5
center.post(name: myNotification, object: nil)
// 6
center.removeObserver(observer)
- center 변수에 default notification 할당
- notification을 듣고있는 Observer를 만든다. (notification 은 위에서 만든 myNotification 사용)
- 해당 이름으로 notification 을 post 한다.
- notification 을 remove 한다.
——— Example of: Publisher ———
Notification received!
콘솔에 이렇게 나타나는데 약간 헷갈릴 수 도 있는데 사실 publisher 오는 결과 아니기 때문이다. 그러기 위해서 subscriber가 있어야 한다.
Hello Subscriber
Subscriber 프로토콜은 publisher로 부터 input을 받을 수 있는 type 에 대한 요구사항을 정의하고있다.
조금있다가 Publisher 와 Subscriber protocol 을 준수하는게 무엇인지 깊게 배워볼 것이다.
일단 지금은 기보적인것 만 보자면
example(of: "Subscriber") {
let myNotification = Notification.Name("MyNotification")
let publisher = NotificationCenter.default.publisher(for: myNotification, object: nil)
let center = NotificationCenter.default
}
만약 notification을 post 했다면 publisher는 emit 하지 않고 그게 중요한 차별점이다.
publisher는 최소 한개의 subscriber 가 있을때만 emit 한다는 점이다.
Subscribing with sink(_:_:)
이전 코드에 이 코드를 더한다.
// 1
let subscription = publisher.sink { _ in
print("Notification received from a publisher!")
}
// 2
center.post(name: myNotification, object: nil)
// 3
subscription.cancel()
- publisher 에 .sink를 호출해 subscription을 만든다.
- notification 을 post 한다.
- subscription을 cancel 한다.
sink는 잠기는 느낌이 아닌 subscriber를 closure 와 함께 붙여 publisher 의 output을 처리하는 간단한 방법이다.
이 예제에서는 closure는 무시하고 대신 message를 출력해여 notification 이 received되었다는 것만 표시한다.
——— Example of: Publisher ———
Notification received from a publisher!
라고 console 에 뜰 것 이다.
sink operator 는 계속해서 publisher 가 emit 하는 많은 값을 수신한다.
이것은 unlimited demand 라고 알려져있는데 다음번에 배울 것 이다.
sink operator 는 두개의 클로져를 제공한다.
- receiving a completion event
- handle receiving values
어떻게 동작하는지 보려면 해당 코드를 밑에 추가해라
example(of: "Just") {
// 1
let just = Just("Hello world!")
// 2
_ = just .sink(
receiveCompletion: {
print("Received completion", $0)
},
receiveValue: {
print("Received value", $0)
})
}
여기서 너는
- 원시 값타입으로 부터 Just를 사용하려 publisher를 만들었다.
- publisher 에 대한 subscription을 만들어 이벤트에 따라 받은 message를 출력했다.
실행해보면 console 에는 아래와 같이 출력된다.
——— Example of: Just ———
Received value Hello world!
Received completion finished
Just의 설명을 살펴보면 해당 결과값을 각각의 subscriber 에게 한번만 전달하고 끝나는 Publisher 라도 되있다.
다른 subscriber를 붙여보자
_ = just .sink(
receiveCompletion: {
print("Received completion (another)", $0)
},
receiveValue: {
print("Received value (another)", $0)
})
실행 시키면 Just Publisher 가 새로운 subscriber에게 딱 한번만 emit 하고 끝나는 걸 볼 수 있다.
Received value (another) Hello world!
Received completion (another) finished
Subscribing with assign(to:on:)
sink에 추가적으로 assign(to:on:) Operator 는 수신된 값을 KVO-compliant property of object 에게 assign 할 수 있도록 한다.
예를들어
example(of: "assign(to:on:)") {
// 1
class SomeObject {
var value: String = "" {
didSet {
print(value)
}
}
}
// 2
let object = SomeObject()
// 3
let publisher = ["Hello", "world!"].publisher
// 4
_ = publisher
.assign(to: \.value, on: object)
}
- 새로운 값을 출력하는 didSet property observer를 가진 class 를 정의한다.
- class 의 인스턴스를 만든다.
- 문자열 배열에서 publisher를 만든다.
- publisher를 subscribe 해서 수신된 각각의 값을 객체의 value 프로퍼티에 할당한다.
실행시켜 보면
——— Example of: assign(to:on:) ———
Hello
world!
일단 sink operator 에 집중하지만 8장에서 assign 에 대해 알아 볼 것 이다.
Hello Cancellable
subscriber가 할일을 다하고 더이상 publisher 로 부터 값을 받지 않는 경우 subscritpion을 cancel하여 사용하고있는 리소스나 네트워크 콜과 같은 활동들을 멈추어 리소스를 해제하는게 좋은 생각이다.
subscriptions 는 AnyCancellable 인스턴스를 cancellation token 처럼 리턴한다. (작업이 끝나고 subscription을 cancel 할 수 있는)
AnyCancellable 프로토콜은 Cancellable 프로토콜을준수하는데 cancel() 메소드를 딱 아까 말했던 목적때문에 가지고 있다.
Subscriber example 에서 subscription을 cancel 해보자
// 1
center.post(name: myNotification, object: nil)
// 2
subscription.cancel()
- notification 을 post 한다.
- subscription은 cancel 한다. subscription에서 cancel() 메서드를 호출할 수 있다. 왜냐하면 Subscription protocol 은 Cancellable 를 상속?받기 때문이다.
실행해보면
——— Example of: Subscriber ———
Notification received from a publisher!
만약 명시적으로 cancel()을 호출하지 않으면 publisher 가 complete 할 때 까지 계속되거나 일반적인 메모리 관리자가 저장된 subscription을 내리기 전까지 계속된다. 어느 순간에는 subscription을 cancel 해줄 것 이다.
*subscription에서 온 return value를 무시하는 것도 좋지만 하나 예고하자만 subscription을 전체 프로젝트에 저장하지 않으면 그 subscription은 프로그램 flow 가 scope 를 벗어나는 즉시 cancel 될 것 이다.
이제 이런 기능이 어떻게 작동되는지 알아보자
Understanding what's going on
UML(unified modeling language) 그림으로 보면
- subscriber 는 publisher를 구독한다.
- publisher 는 subscription을 만들어 subscriber 에게 전달한다.
- subscriber 가 값을 요청한다.
- publisher 가 값을 전송한다.
- publisher 가 completion을 전송한다.
*해당 내용의 세부 내용은 18장에서 배우게 된다.
Publisher protocol 과 중요한 extension들을 보면
public protocol Publisher {
// 1
associatedtype Output
// 2
associatedtype Failure : Error
// 4
func receive<S>(subscriber: S)
where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
// 3
public func subscribe<S>(_ subscriber: S)
where S : Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
- publisher 가 만들 수 있는 값의 타입
- publisher 가 만들 수 있는 error 타입 또는 에러가 없는게 보장될 경우 Never 할당
- subscriber는 publisher 와 붙을 수 있게 subscribe(_:) 함수를 호출
- subscribe(_:)의 구현은 receive(subscriber:)를 호출하여 subscriber 를 publisher 에게 연동시킴(creation of subscription)
associated types 는 publisher interface 로 subscriber subscription을 만들기 위해 무조건 일치시켜야 한다.
이제 Subscriber protocol 을 보면
public protocol Subscriber: CustomCombineIdentifierConvertible {
// 1
associatedtype Input
// 2
associatedtype Failure: Error
// 3
func receive(subscription: Subscription)
// 4
func receive(_ input: Self.Input) -> Subscribers.Demand
// 5
func receive(completion: Subscribers.Completion<Self.Failure>)
}
- subscriber 가 수신할 수 있는 값 타입
- subscriber 가 수신할 수 있는 error 타입, 에러를 받는 경우가 없는게 보장되면 Never 할당
- publisher는 subscriber 에게 subscription을 주기 위해 receive(subscription:) 을 호출한다.
- publisher 는 subscriber의 receive(_:)를 호출해 방금 published 된 새로운 값을 subscriber 에게 전달
- publisher 는 subscriber 의 receive(completion:) 를 호출해 값을 만드는게 끝났거나 에러가 발생했다고 알림
publisher 와 subscriber 의 연결을 subscription이라고 한다. 여기서 Subscription 프로토콜을 보면
public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
subscriber는 request(_:)를 호출해 값을 최대 무한대의 값을 더 받고 싶음을 나타낸다.
* subscriber 가 얼마나 많은 갑을 받고싶은지에 대해 말하는 개념은 backpressure management 라고 불린다.
**Backpressure is when the progress of turning that input to output is resisted in some way (배압)
컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다.
Creating custom subscriber
코드를 작성한다.
example(of: "Custom Subscriber") {
// 1
let publisher = (1...6).publisher
// 2
final class IntSubscriber: Subscriber {
// 3
typealias Input = Int
typealias Failure = Never
// 4
func receive(subscription: Subscription) {
subscription.request(.max(3))
}
// 5
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}
// 6
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
}
- 정수들 값을 범위로 만들어 publisher 로 만든다.
- IntSubscriber라는 custom subscriber 를 만든다.
- 타입 별명을 붙여서 이 subscriber는 오직 정수 입력만 받을 수 있고 절대 에러가 발생하지 않는다고 명시한다.
- 필요한 메소드를 구현한다.
- receive(subscription:) : publisher 에게 호출된다.
그리고 해당 메소드 안에 .request(_:)를 호출해 subscriber가 subcription을 통해 최대 3개의 값을 받을 수 있게 해준다.
- receive(subscription:) : publisher 에게 호출된다.
- 수신될때 각각의 값을 출력하고 .none을 리턴하여 demand를 수정하지 않를 것을 표시한다. .none은 .max(0)과 같다.
- completion event 를 출력한다.
publisher 가 어떤걸 publish 하던 subscriber가 있어야 하니까 추가한다.
let subscriber = IntSubscriber()
publisher.subscribe(subscriber)
이 코드에서는 출력값의 타입과 Failure 타입이 같은 subscriber를 만들고 publisher 에게 subscrib 하라고 한다.
실행해보면
——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3
completion event 를 받지 못한 걸 볼수 있다. 왜냐하면 publisher 는 최대 3개만 값을 받을 수 있기 때문이다.
receive 에서 .none 을 .unlimited로 바꾸면 모든값이 다 출력되고 completion event 도 출력되는걸 볼 수 있다.
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited
}
——— Example of: Custom Subscriber ———
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
.unlimited를 .max(1)로 바꾸고 다시 실행시켜 보면 .unlimied와 같은 결과가 출력되는걸 볼 수 있는데 왜냐하면 각각 이벤트를 받을 때 마다 1씩 max를 증가시키고 있기 때문이다.
다시 .max(1)를 .none으로 바꾸고 publisher 정의를 문자의 배열로 바꿔본다.
let publisher = ["A", "B", "C", "D", "E", "F"].publisher
실행시켜보면 subscribe 메소드에서 error가 발생하는데 publisher 와 subscriber 의 Input 과 Failure type 이 맞지않기 때문이다.
publisher를 다시 원래대로 정수 범위로 바꿔줘 error 를 해결한다.
Hello Future
Just를 사용해서 하나의 값만 emit 하는 publisher 를 만들어 본것 처럼 Future 도 비동기적으로 하나의 결과값을 만들고 complete 한다.
example(of: "Future") {
func futureIncrement(
integer: Int,
afterDelay delay: TimeInterval) -> Future<Int, Never> {
}
}
여기서 factory cuntion을 만들어 Int와 Never 타입을 가지는 future을 반환한다.
또한 subscriptions 집합을 추가해 future에 subscrition들을 저장할 것 이다.
길게 동작하는 비동기 작업들은 subscription을 저장하지 않으면 현재 code 의 scope 가 끝날 때 cancelation을 초래한다.
(플레이그라운드에 경우 바로 cancelation이 발생한다.)
다음으로 함수 body를 채워 future 을 만든다.
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
promise(.success(integer + 1))
}
}
이 코드는 fure를 정의해서 caller 에 의해 명시된 값을 사용해 integer 를 delay 이후에 증가시킬 것이라는 promise 를 만든다.
Future는 결국 single value 를 만들고 끝나는 Pulblisher 이다.
그리고 값이나 에러가 가능해 졌을 때 closure 를 invoke 하고 해당 invoked 된 closure 는 promise 로 언급된다.
Future 의 정의를 보자면
final public class Future<Output, Failure> : Publisher
where Failure: Error {
public typealias Promise = (Result<Output, Failure>) -> Void ...
}
Promise 는 Future 에 의해 published 된 single value 또는 error 를 가지는 Result 를 수신하는 클로져의 별명이다.
futureincrement 정의에 코드를 추가해본다.
// 1
let future = futureIncrement(integer: 1, afterDelay: 3)
// 2
future
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)
- 이전에 만든 factory function을 사용하여 future을 만들고 3초 뒤에 전달한 정수값을 증가시키라고 적어놨다.
- subscribe 하고 값과 completion event 를 수신하고 resulting subscription을 subscriptions set 에 저장한다.
이 챕터 뒤에서 subscriptions을 collection에 저장하는걸 배우게 된다.
실행시켜 보면 다음과 같이 콘솔에 찍힌다.
——— Example of: Future ———
2
finished
두번째 subscription을 future에 더해라
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)
playground를 실행하기전에 다음 출력문을 futureIncrement 함수의 DisPatchQueue블록에 넣어라
print("Original")
실행시켜 보면 특정 delay 이후 두번째 subscription이 같은 값을 수신하는걸 알 수 있다.
future은 promise 를 다시 실행시킨게 아니라 결과값을 share, replay 한 것 이다.
——— Example of: Future ———
Original
2
finished
Second 2
Second finished
Original 출력은 subscription이 일어난 다음 바로 출력된다.
기서은 future 가 만들어진 직후 바로 실행하기 때문이다.
일반적인 publisher 와는 다르게 subscriber가 필요없다.
앞서 예제들로 제한된 갯수만큼 Publish 되는 값에 대해서 작업해 보았다.
notification center 예제는 publisher 의 예시로써 무제한으로 아래의 2개를 제공하며 비동기적으로 value 를 publish 할 수 있었다.
- notification sender 가 notifications 를 emit 한다.
- 특정 notification 에 대한 구체적인 subscriber 가 있다.
위 작업을 내 코드로 똑같이 해볼 수 있는 방법인 Subject 에 대해서 알아본다.
Hello Subject
publisher 와 subscriber와 custom subscriber를 만드는 방법까지 공부해 봤다면 이후에는 custom publisher를 만드는 방법을 배우게 될 것 이다.
일단 지금은 subject 에 대해서 배워보자
subject는 combine 이 아닌 코드에서 combine subscriber 에게 value 를 전송할 수 있게 해준다.
example(of: "PassthroughSubject") {
// 1
enum MyError: Error {
case test
}
// 2
final class StringSubscriber: Subscriber {
typealias Input = String
typealias Failure = MyError
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: String) -> Subscribers.Demand {
print("Received value", input)
// 3
return input == "World" ? .max(1) : .none
}
func receive(completion: Subscribers.Completion<MyError>) {
print("Received completion", completion)
}
}
// 4
let subscriber = StringSubscriber()
}
- custom error type 을 정의한다.
- MyError 에러와 strings를 수신하는 custom subscriber를 정의한다.
- 수신된 값에 대해 요구사항을 반영한다.
- custom subscriber instance 를 만든다.
입력값이 "World"일 때 .max(1)을 receive(_:) 에 반환하면 새로운 max 는 3으로 된다. (원래 max 값 + 1)
custom error type을 정의하고 요구에 맞게 수신된 값을 바꾸는 것 외엔 새로운 건 없다.
이제 흥미로운 부분이 추가되는데
// 5
let subject = PassthroughSubject<String, MyError>()
// 6
subject.subscribe(subscriber)
// 7
let subscription = subject .sink(
receiveCompletion: { completion in
print("Received completion (sink)", completion)
},
receiveValue: { value in
print("Received value (sink)", value)
}
)
- String 타입과 custom error type 의 PassthroughSubject 인스턴스를 만든다.
- subscriber 를 subject 에 subscribes 한다. (구독자를 주제에 구독한다)
- sink 를 통해 다른 subscription을 만든다.
Passthrough subject 는 새로운 값을 publish 할 수 있게 해준다.
이런 값들과 completion event 를 전달 하는데 어느 publisher 든 먼저 값의 type 과 발생할 수 있는 error 에 대해서 먼저 선언해야 한다.
subscribers는 무조건 input 과 failture type 에 맞춰져야 비로소 passthrough subject를 subscribe할 수 있다.
이제 passthrough subject를 만들었으니 값을 보내보자
subject.send("Hello")
subject.send("World")
이 코드는 한번에 하나씩 2개의 값을 보낸다.
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
각각의 subscriber 는 published 된 값을 수신한다.
subscription.cancel()
subject.send("Still there?")
- 두번째 subscription 을 cancel 하고
- 다른 값을 보낸다.
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
코드를 더 추가하면
subject.send(completion: .finished)
subject.send("How about another one?")
실행해보면 subscriber는 "How about another one?"는 수신하지 않는다. 왜냐하면 completion event 를 value가 전해진 바로 다음에 받기 때문이다. 첫번째 subscriber는 completion event나 값을 받지 못한다. 왜냐하면 subscription이 이전에 canceled 되었기 때문이다.
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion finished
completion event를 보내기 전 line에 코드를 추가해 보면
subject.send(completion: .failure(MyError.test))
——— Example of: PassthroughSubject ———
Received value Hello
Received value (sink) Hello
Received value World
Received value (sink) World
Received value Still there?
Received completion failure(...MyError.test)
첫번째 subscriber 에 의해 에러가 받아진다. 하지만 error 이후에 보내진 completion event 는 아니다.
이것은 publisher 가 single completion event를 보냈 을 경우 (에러이든 아니던) 끝나는 것이다.
PassthroughSubject로 값을 전달하는 방법은 명령형 코드와 선언형 세계를 이어주는 다리 역할인 것 이다.
하지만 가끔은 publisher 가 가진 현재값을 보고싶을 경우가 있다. 그럴 때 CurrentValueSubject를 사용한다.
subscription 각각의 값을 저장하는 대신 AnyCancellable의 콜렉션에 subscription을 담다둘 수 있다.
이 collection은 더해진 subscription들을 자기 자신이 deinit 될때 함께 cancel 시킨다.
example(of: "CurrentValueSubject") {
// 1
var subscriptions = Set<AnyCancellable>()
// 2
let subject = CurrentValueSubject<Int, Never>(0)
// 3
subject
.sink(receiveValue: { print($0) })
.store(in: &subscriptions) // 4
}
- subscriptions set 를 만든다.
- CurerntValueSubject 를 만든다. (초기값 0)
- subject를 sink 를 통해 구독하고 값을 출력하게 한다.
- subscription set 에 subscription을 저장하고 inout 파라미터를 통해 copy 대신 업데이트 되게 한다.
——— Example of: CurrentValueSubject ———
0
실행시켜 보면 초기값 0을 출력하는 것을 볼 수 있다.
subject.send(1)
subject.send(2)
두개를 더 보내면 1,2 가 찍힌다.
passthroughSubject 와는 다르게 current value 를 .value 프로퍼티를 통해 물어볼 수 있다.
subject.value
또한 send(_:)를 통해 새로운 값을 보낼 수 있다. 다른 방법으로는 value 프로퍼티에 그냥 값을 할당하면된다.
subject.value = 3
단 value 프로퍼티에는 .finished 와 같은 event 는 보낼 수 없기에 send(_:) 메소드를 사용해야 한다.
Dynamically adjusting demand