채널은 오케스트레이션(orchestration)을 위해 존재한다. 채널을 쓰면 2개의 고루틴(Goroutine)이 특정 워크플로우를 함께 처리하도록 할 수 있으며 또한 우리가 원하는 대로 워크플로우를 관리할 수 있다. 채널이 큐(queue)처럼 선입선출(first-in-first-out)로 구현된 듯 하지만 우리가 생각해야 하는 점은 채널이 큐라는 것이 아니다. 채널을 큐라고 생각하면 개념을 이해하기 어려워진다. 대신에 채널은 또다른 고루틴에 이벤트(event)가 발생했다는 신호를 주는 방식이라고 생각해야 한다. 멋지게도, 데이터가 있을 때는 물론 데이터 없이도 이벤트 신호를 줄 수 있다.
우리가 무슨 일을 하든 신호 주기(signaling)를 염두에 둔다면 채널을 적절한 방식으로 쓸 수 있다. 고(Go)에는 두 종류의 채널이 있다. 버퍼 없는(unbuffered) 채널과 버퍼 있는(buffered) 채널이다. 두 종류 중 어떤 것을 쓰든 신호를 주며 데이터를 보낼 수 있다. 중요한 차이점은 버퍼 없는 채널을 쓰면 신호를 줄 때 그 신호가 전달되었다는 보장을 받을 수 있다는 점이다. 고루틴이 우리가 할당한 작업을 끝냈는지 아닌지에 대해서는 알 수 없지만 적어도 그 보장은 받을 수 있다. 대신 신호가 전달되었다는 보장을 받는 대가로 더 긴 지연 시간(latency)을 감수해야 한다. 왜냐하면 버퍼 없는 채널의 반대편에 있는 고루틴이 데이터를 받았다는 것을 확실히 하기 위해 그 시점까지 기다려야 하기 때문이다.
버퍼 없는 채널이 작동하는 방식은 다음과 같다. 채널에 신호를 주려는 고루틴이 있다고 해보자. 채널은 신호와 함께 데이터도 보내려고 한다. 고루틴은 채널에 바로 데이터를 넣을 것이다. 하지만 채널은 반대편에 신호를 받을 고루틴이 있는지 알아야 하기 때문에 그때까지 데이터는 잠기고(locked) 움직일 수 없게 된다. 두 고루틴 모두 채널에 관여하고 있지 않고 있다. 마침내 고루틴이 와서 데이터를 받겠다고 말할 때, 데이터를 전송할 수 있다.
여기에 버퍼 없는 채널이 신호가 전달되었다는 보장을 해줄 수 있는 이유가 있다. 신호 받기가 먼저 일어난다. 신호 받기가 일어나면 신호가 전달되었다는 것을 알 수 있고, 이제야 다른 일을 하러 갈 수 있다.
버퍼 없는 채널은 매우 강력한 채널이다. 이 보장을 최대한 활용하면 좋다. 하지만 다시 한번 말하건대 보장의 대가는 기다림 때문에 일어나는 긴 지연 시간이다.
버퍼 있는 채널은 약간 다르다. 우리는 신호가 전달되었다는 보장을 얻지 못하는 반면 신호 주기와 받기 모두에서 지연 시간을 줄일 수 있다.
앞의 예시로 되돌아가보자. 버퍼 없는 채널을 버퍼 있는 채널로 바꿨다고 해보자. 버퍼가 1밖에 안되는 채널을 쓴다고 가정할 것이다. 이는 채널 안에 1개의 데이터 조각을 위한 공간이 있다는 뜻이며 반대편이 데이터를 받을 때까지 기다릴 필요가 없다는 뜻이다. 따라서 이제 어떤 고루틴이 와서 데이터를 채널에 넣고 바로 가버릴 수 있다. 다시 말하면 신호 보내기가 신호 받기보다 먼저 일어난다. 고루틴이 신호 주기에 대해 아는 전부는 신호를 보냈고, 데이터를 넣었다는 것 뿐이며 신호가 언제 전달될지에 대해서는 아는 것이 없다. 이제 다른 고루틴이 와서 데이터가 있는 것을 보고 그걸 받아 가기를 바랄 뿐이다.
버퍼가 1개 있는 채널은 이런 종류의 지연 시간을 다룰 때 쓴다. 더 큰 버퍼가 필요할 때도 있지만 1개 이상의 버퍼를 쓸 때 적용되는 설계 규칙을 나중에 배우게 될 것이다. 하지만 신호 주기가 들어오고 있고 그 신호들이 잠길(locked) 가능성이 있는 상황이라면 우리는 다시 생각해봐야 한다. 버퍼 1개 채널이 우리가 맞닥뜨린 지연 시간을 줄이는 데 충분한가? 왜냐하면 앞으로 우리는 신호를 보낼 때마다 버퍼 있는 채널이 항상 비어있기를 바랄 것이기 때문이다.
버퍼 있는 채널은 성능을 위한 것이 아니다. 버퍼 있는 채널은 연속성을 위해, 바퀴가 계속 굴러가게 하기 위해 써야 한다. 우리가 알아두어야 할 것 중 하나는 모든 것이 문제 없이 동작할 때 잘 돌아가는 코드를 짜는 것은 아무나 할 수 있다는 점이다. 문제들이 생겨날 때가 바로 아키텍쳐와 엔지니어가 정말 중요해지는 시점이다. 우리가 만든 소프트웨어는 스트레스를 받지 않는다.[재방문] 스트레스를 받는 것은 우리다. 우리가 책임감을 가져야 한다.
예시로 돌아와서, 보낸 신호가 전달되었는지 정확히 아는 것이 중요하지는 않지만 신호가 확실히 전달되도록 해야 할 필요는 있다. 1개짜리 버퍼를 가진 채널은 거의 확실한 보장을 해준다. 왜냐하면 신호 보내기를 하고, 데이터를 집어넣고, 돌아섰다가, 다시 돌아왔을 때에 버퍼가 비워져 있는 것을 보기 때문이다. 이제 우리는 신호가 전달되었다는 것을 알 수 있다. 신호를 보냈을 시점에 즉시 알 수는 없지만 1개 짜리 버퍼를 씀으로써 돌아왔을 때 버퍼가 비어있다는 것은 알 수 있다.
그러고 나면 또다른 데이터 조각을 채널에 넣을 수 있다. 그리고 운이 좋다면 다시 돌아왔을 때 그 데이터는 사라져 있을 것이다. 만약 사라져 있지 않다면 문제다. 데이터를 받는 쪽에서 문제가 생긴 것이다.[재방문] 채널이 비워지기 전까지는 앞으로 나아갈 수 없다. 데이터가 왜 계속 머물러 있는지 알아야 하기 때문에 이런 문제는 즉시 보고해야 한다. 이것이 안정적인 시스템을 짓는 방법이다.
더 많은 일을 가져오지 말아야 한다.[재방문] 문제가 생겼을 때 데이터를 받는 쪽을 조사해야 하므로 시스템에 더 많은 부하를 주어서는 안된다. 우리가 책임질 수 없는 일에 더 많은 책임을 지어서는 안된다.
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("\n=> Basics of a send and receive\n")
basicSendRecv()
fmt.Printf("\n=> Close a channel to signal an event\n")
signalClose()
}
basicSendRecv는 신호 주기오 받기의 기본을 보여준다. make 함수는 채널을 만들 때 쓴다. make를 쓰지 않고서는 유용한 채널을 만드는 다른 방법은 없다. 채널은 우리가 신호에 담아 보낼 데이터의 타입에 기반한다. 이 경우에는 string을 썼다. 이 채널은 참조(reference) 타입이다. ch는 단지 수면 아래 있는 거대한 데이터 구조를 가리키는 포인터 변수이다.
func basicSendRecv() {
아래는 버퍼 없는 채널이다.
ch := make(chan string)
go func() {
아래는 신호 보내기다. 신호 주기는 이항 연산자(binary operation)이며 채널 쪽을 가리키는 화살표 기호를 사용한다. 여기서는 "hello"라는 string 변수로 신호를 주고 있다. This is a send: a binary operation with the arrow pointing into the channel. We are signaling with a string "hello".
ch <- "hello"
}()
아래는 신호 받기다. 신호 받기 역시 화살표지만 채널 왼쪽에 붙어 있으며 단항 연산자다. 이는 데이터가 채널에서 나오고 있다는 것을 보여준다. 이제 우리는 신호 보내기와 받기가 함께 있어야 하는 버퍼가 없는 채널을 갖고 있다. 또한 신호 받기가 먼저 일어나기 때문에 우리는 신호가 전달되었다는 것을 알 수 있다. 신호 보내기와 받기 모두 둘 모두가 모여서 전달이 일어날 수 있기 전까지는 멈출(block) 것이다.
fmt.Println(<-ch)
}
signalClose는 이벤트를 신호로 주기 위해 채널을 닫는(close) 법을 보여준다.
func signalClose() {
여기서는 빈 구조체(struct)를 써서 채널을 만든다. 이는 데이터가 없는 신호이다.
ch := make(chan struct{})
이제 작업을 하기 위해 고루틴을 시작해보자. 고루틴이 100 밀리초(millisecond)가 걸린다고 가정해보자. 고루틴이 일을 마쳤을 때 또다른 고루틴에게 신호를 주려고 한다. 일이 끝났다는 것을 데이터가 없이 알리기 위해 채널을 닫을 것이다. 버퍼가 있든 없든 채널을 만들 때 채널은 두 상태(state) 중 하나에 놓일 수 있다. 모든 채널은 열린(open) 상태에서 시작해서 우리는 데이터를 주고 받을 수 있다. 채널을 닫힌(closed) 상태로 변경하면 다시 열릴 수 없다. 또한 채널을 두번 닫을 수도 없다. 정합성(integrity) 문제 때문이다. 데이터를 두번 보내지 않고는 신호를 두번 보낼 수 없다.
go func() {
time.Sleep(100 * time.Millisecond)
fmt.Println("signal event")
close(ch)
}()
채널이 닫히면 신호를 받는 쪽은 즉시 반환(return)된다. 열려 있는 채널에서 신호를 받으려고 하면 데이터 신호를 받기 전까지 반환될 수 없다. 하지만 닫혀 있는 채널에서 신호를 받는다면 데이터 없이도 신호를 받을 수 있다. 우리는 이벤트가 일어났다는 것을 안다. 그 채널에 일어나는 모든 신호 받기는 즉시 반환될 것이다.
<-ch
fmt.Println("event received")
}
=> Basics of a send and receive
hello
=> Close a channel to signal an event
signal event
event received
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
fmt.Printf("\n=> Double signal\n")
signalAck()
fmt.Printf("\n=> Select and receive\n")
selectRecv()
fmt.Printf("\n=> Select and send\n")
selectSend()
fmt.Printf("\n=> Select and drop\n")
selectDrop()
}
signalAck은 어떻게 이벤트를 신호로 줄 수 있고 처리가 끝났다는 승인(acknowledgement)을 기다릴 수 있는지 보여준다. 이는 신호가 전달되었다는 보장을 해줄 뿐만 아니라 작업이 끝난 때를 알 수 있다. 이는 이중 신호와 비슷하다.
func signalAck() {
ch := make(chan string)
go func() {
fmt.Println(<-ch)
ch <- "ok done"
}()
이 고루틴은 신호가 전달되기 전까지 멈춘(block)다. 즉 우리가 신호를 받기 전까지 이 고루틴은 움질일 수 없다.
ch <- "do this"
fmt.Println(<-ch)
}
=> Double signal
do this
ok done
신호 고르기는 고루틴이 신호 보내기와 받기를 할 때 한번에 여러 채널을 다루는 방식이다. 이 방식은 이벤트 루프(event loop)를 만들 때 매우 쓸모 있지만 공유 상태(shared state)를 직렬화(serializing)하는 데는 별로 좋지 않다. selectRecv는 고르기(select) 문을 써서 값(value)을 받을 때까지 특정 시간만큼을 기다리는 방식을 보여준다.
func selectRecv() {
ch := make(chan string)
특정 시간만큼을 기다리고 신호 보내기를 수행한다.
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "work"
}()
2개의 서로 다른 채널에서 2개의 서로 다른 신호를 받는다. 한 채널은 위에서 본 것이고 다른 한 채널은 시간을 재기 위해서다. time.After는 주어진 만큼의 시간이 지난 뒤 현재 시간 신호를 보내는 채널을 돌려준다. 우리는 작업을 처리하면서 보내는 신호를 받고 싶지만 끝없이 기다릴 생각은 없다. 우리는 100 밀리초만 기다리고 그 뒤에는 다음으로 넘어갈 것이다.
select {
case v := <-ch:
fmt.Println(v)
case <-time.After(100 * time.Millisecond):
fmt.Println("timed out")
}
하지만 이 코드에는 매우 흔한 버그(bug)가 있다. 우리가 앞으로 보게 될 가장 커다란 버그 중 하나는 위와 같은 코드를 짜고 고루틴에게 종료될 기회를 주지 않을 때 생긴다. 버퍼 없는 채널을 쓰고 있는데 이 고루틴은 언젠가는 기다리는 시간이 끝나고 신호 보내기를 하고 싶을 것이다. 하지만 이것은 버퍼 없는 채널이다. 신호 보내기는 대응하는 신호 받기가 없으면 완료될 수 없다. 만약 고루틴이 시간 초과되고 그 다음으로 넘어간다면 어떻게 될까? 대응하는 신호 받기가 없으므로 고루틴 누수(leak)가 생길 것이다. 다시 말해 이 고루틴은 결코 종료되지 않을 것이다.
이 버그를 고치는 가장 깔끔한 방법은 1개짜리 버퍼가 있는 채널을 쓰는 것이다. 신호 보내기가 일어날 때 신호가 전달되리라는 보장을 할 수는 없다. 하지만 우리는 이 보장이 필요 없다. 단지 신호를 보낼 필요가 있을 뿐이며 보내고 나서는 다음으로 넘어갈 수 있다. 그러므로 반대쪽에서 신호를 받거나 아니면 못 받고 그냥 넘어갈 것ㅇ다. 신호를 받지 못하더라도 신호 보내기가 일어나도록 신호를 담을 자리가 버퍼에 있기 때문에 이 신호 보내기는 여전히 완료된다.
=> Select and receive
work
selectSend는 신호 보내기를 시도할 때 특정 시간 동안만 시도하기 위해 신호 고르기 문(select statement)을 어떻게 쓸 수 있는지를 보여준다.
func selectSend() {
ch := make(chan string)
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
fmt.Println(<-ch)
}()
select {
case ch <- "work":
fmt.Println("send work")
case <-time.After(100 * time.Millisecond):
fmt.Println("timed out")
}
위 함수와 비슷하게 고루틴 누수는 일어날 것이다. 다시 한번, 1개짜리 버퍼 채널이 여기서 우리를 구해줄 것이다.
=> Select and send
work
send work
selectDrop은 신호 고르기(select)를 써서 채널이 즉시 막혔을(block) 때 넘어가는 방식을 보여준다.
이는 매우 중요한 패턴이다. 서버에 할일이 들이닥쳤거나 할일이 곧 오는 상황을 상상해보자. 서버가 일을 맡길 모듈은 제대로 동작하고 있지 않다.[재방문] 그렇다고 일을 그냥 쌓아놓을 수도 없다. 이 때 우리는 앞으로 계속 나아가기 위해서 일을 버려야 한다.
서비스 거부 공격(Denial-of-service attack)은 좋은 예시이다. 우리는 서버로 오는 수많은 요청을 받는다. 우리가 요청 하나하나 모두 처리하려고 든다면 아마 서버는 터지고 말 것이다. 우리는 처리할 수 있는 것은 처리하고 다른 요청은 버려야 한다.
이런 종류의 패턴(fanout)을 써서 우리는 몇몇 데이터를 버리려고 한다. 이를 위해 1개보다 큰 버퍼를 쓸 수 있다. 버퍼가 얼마나 커야 할지는 재보아야 한다. 아무렇게나 정할 수는 없다.
func selectDrop() {
ch := make(chan int, 5)
go func() {
여기서 우리는 신호 받기 루프(loop)에서 작업할 데이터가 오기를 기다리고 있다.
for v := range ch {
fmt.Println("recv", v)
}
}()
아래 코드는 작업을 채널에 보낼 것이다. 버퍼가 다 차면, 버퍼는 막히게(block) 되고, 기본 케이스(default case)가 실행되며 작업을 버리게 된다.
for i := 0; i < 20; i++ {
select {
case ch <- i:
fmt.Println("send work", i)
default:
fmt.Println("drop", i)
}
}
close(ch)
}
=> Select and drop
send work 0
send work 1
send work 2
send work 3
send work 4
send work 5
drop 6
drop 7
drop 8
drop 9
drop 10
drop 11
drop 12
drop 13
drop 14
drop 15
recv 0
recv 1
recv 2
recv 3
recv 4
recv 5
drop 16
send work 17
send work 18
send work 19
아래 프로그램은 고루틴 2개를 테니스 매치에 넣을 것이다. 여기서는 공이 양쪽 편에서 쳐지거나 놓쳤다는 보장을 필요로 하기 때문에 버퍼 없는 채널을 쓴다.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
버퍼 없는 채널을 생성한다.
court := make(chan int)
wg는 동시성을 관리하기 위해 쓰인다.
var wg sync.WaitGroup
wg.Add(2)
선수 둘을 입장시킨다. 둘 모두 받기 모드에서 시작할 것이다. 어떤 선수가 먼저 공을 받게 될지는 알 수 없다. 메인 고루틴을 심판이라고 생각하자. 누가 공을 먼저 받는지는 심판에게 달려 있다.
go func() {
player("Hoanh", court)
wg.Done()
}()
go func() {
player("Andrew", court)
wg.Done()
}()
테니스 경기를 시작한다. 메인 고루틴이 신호 보내기를 한다. 선수 둘 모두 신호 받기 모드이므로 어느 쪽이 먼저 공을 받게 될지 알 수 없다.
court <- 1
경기가 끝날 때까지 기다린다.
wg.Wait()
}
player는 테니스 경기를 하는 사람을 흉내낸다. 값 의미(value semantic)를 써서 채널 값을 요구한다.[재방문]
func player(name string, court chan int) {
for {
공이 다시 넘어올 때까지 기다린다. 이게 또다른 형태의 신호 받기라는 점을 놓치지 말자. 단순히 값을 받는 대신에 신호 받기가 어떻게 반환되었는지 나타내는 플래그(flag)를 받을 수 있다. 만약 신호가 데이터 때문에 생겼다면 ok는 true일 것이다. 만약 신호가 데이터 없이 생겼다면, 다른 말로 채널이 닫혔다면 ok는 false일 것이다. 이를 통해 누가 이겼는지 결정할 수 있다.
ball, ok := <-court
if !ok {
만약 채널이 닫혔다면 우리가 이긴 것이다.
fmt.Printf("Player %s Won\n", name)
return
}
무작위 값을 하나 정해서 공을 놓쳤는지 (즉, 우리가 졌는지) 알아본다. 만약 경기에서 진다면 채널을 닫을 것이다. 그러면 반대편 player는 데이터 없이 신호를 받았다는 것을 알게될 것이다. 채널은 닫히고 반대편 player가 이긴다. player 둘 모두 반환한다.[재방문]
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
우리가 졌다는 신호를 보내기 위해 채널을 닫는다.
close(court)
return
}
공을 친 횟수를 보여주고 하나 증가시킨다. 만약 위에서 말한 두가지 경우가 생기지 않는다면 경기는 아직 진행 중이다. 공의 값을 하나 증가시키고 신호 보내기를 한다. 반대편 player는 여전히 신호 받기 모드에 있다는 것을 우리는 안다. 그러므로 신호를 보내는 쪽과 받는 쪽은 결국 함께 모일 것이다. 다시 한번, 버퍼 없는 채널에서는 전달이 보장되기 때문에 신호 받기가 먼저 일어난다.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
공을 쳐서 상대 선수에게 다시 보낸다.
court <- ball
}
}
Player Andrew Missed
Player Hoanh Won
이 프로그램은 고루틴 네 개 간의 이어 달리기를 흉내내기 위해 버퍼 없는 채널을 쓰는 방식을 보여준다. 달리기 선수 네 명이 트렉에 있다고 상상해보자. 한번에 한 명만 달릴 수 있으며 마지막 선수를 제외하고는 다음에 달릴 사람이 있다. 다음 선수는 앞 선수에 이어서 달릴 때까지 기다린다.
package main
import (
"fmt"
"sync"
"time"
)
wg는 프로그램이 끝나기까지 기다리기 위해 쓰인다. var wg sync.WaitGroup
func main() {
버퍼 없는 채널을 생성한다.
track := make(chan int)
마지막 선수를 위해 값이 1인 횟수를 더한다.[재방문] 우리가 관심 있는 건 마지막 선수가 우리에게 끝났다고 알려주는 것뿐이기 때문에 1만 더한다.
wg.Add(1)
첫번째 선수를 출발선 상에 생성한다.[재방문]
go Runner(track)
메인 고루틴이 달리기를 시작한다. (신호총을 쏜다) 이 순간에 우리는 다른 쪽에서는 고루틴이 신호 받기를 하고 있다는 것을 알고 있다.
track <- 1
달리기가 끝나기를 기다린다.
wg.Wait()
}
Runner는 이어 달리기에서 달리는 사람을 흉내낸다. Runner는 처음부터 끝까지 모든 일을 하고 종료될 것이기 때문에 루프(loop)를 갖고 있지 않다. 우리는 이 패턴이 동작하게 하기 위해 고루틴 Runner를 계속 더할 것이다.
func Runner(track chan int) {
바통이 넘겨진 횟수다.
const maxExchanges = 4
var exchange int
데이터와 함께 바통을 받을 때까지 기다린다.
baton := <-track
트렉을 달리기 시작한다.
fmt.Printf("Runner %d Running With Baton\n", baton)
출발선에 있는 새로운 선수다. 이 선수가 달리기 마지막 선수인가? 아니라면 몇번째 선수인지 기록하기 위해 데이터에 1을 더한다. 우리는 고루틴을 하나 더 만들 것이다. 새로운 고루틴은 즉시 신호 받기 모드에 돌입할 것이다. 이제 트렉에 두번째 고루틴이 있으며 바통을 받기를 기다리고 있다. (1)
if baton < maxExchanges {
exchange = baton + 1
fmt.Printf("Runner %d To The Line\n", exchange)
go Runner(track)
}
트렉을 한바퀴 돈다.
time.Sleep(100 * time.Millisecond)
경기가 끝났는가.
if baton == maxExchanges {
fmt.Printf("Runner %d Finished, Race Over\n", baton)
wg.Done()
return
}
다음 선수에게 바통을 넘겨준다.
fmt.Printf("Runner %d Exchange With Runner %d\n", baton, exchange)
마지막 선수가 아니기 때문에 (1)이 받을 수 있도록 신호 보내기를 한다.
track <- exchange
}
Runner 1 Running With Baton
Runner 2 To The Line
Runner 1 Exchange With Runner 2
Runner 2 Running With Baton
Runner 3 To The Line
Runner 2 Exchange With Runner 3
Runner 3 Running With Baton
Runner 4 To The Line
Runner 3 Exchange With Runner 4
Runner 4 Running With Baton
Runner 4 Finished, Race Over
다음은 1개보다 큰 버퍼가 있는 채널의 고전적인 사용 예시이다. 팬 아웃(Fan Out) 패턴이라고 불린다.
아이디어는 다음과 같다. 고루틴이 할일을 하다가 많은 데이터베이스 작업을 실행하기로 결정했다고 해보자. 이 고루틴은 그 일을 하기 위해 새로운 고루틴을 여러개, 예컨대 10개를 만들어낼 것이다. 각 고루틴은 데이터베이스 작업 2개를 수행할 것이다. 결국 데이터베이스 작업 20개가 고루틴 10개로 쪼개진다. 다시 말하면 원래 있던 고루틴이 고루틴 10개를 팬 아웃(fan out)하고 생성한 모든 고루틴이 작업을 끝내고 보고하기를 기다리는 것이다.
여기서는 버퍼 있는 채널이 딱인데 이는 우리가 미리 20개 작업을 수행하는 고루틴이 10개 있을 것이라는 점을 알기 때문이다. 따라서 버퍼 크기는 20이다. 우리는 결국 마지막에는 작업 신호를 받아야 한다는 점을 알기 때문에 어떤 작업 신호도 막힐 염려가 없다.
package main
import (
"fmt"
"log"
"math/rand"
"time"
)
result는 각 연산이 끝나고 돌려받는 것이다.
type result struct {
id int
op string
err error
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
고루틴 수와 연산 수를 설정한다.
const routines = 10
const inserts = routines * 2
어떤 입력에 대해서든 정보를 받기 위해 버퍼 있는 채널을 연다.
ch := make(chan result, inserts)
우리가 처리해야 할 응답의 개수이다. 이 고루틴은 자신의 스택 공간(stack space)를 관리할 수 있기 때문에 WaitGroup을 쓰는 대신 지역 변수(local variable)을 WaitGroup처럼 쓸 것이다. 그러므로 시작하자마자 이 지역 변수를 입력 20개로 설정한다.
waitInserts := inserts
모든 입력을 수행한다. 이것이 바로 팬 아웃이다. 이제 우리에게는 고루틴이 10개 있다. 각 고루틴은 입력 두 개를 수행한다. 입력의 결과는 ch 채널에서 쓰인다. 이 채널은 버퍼 있는 채널이기 때문에 어떤 신호 보내기도 막히지(blcok) 않는다.
for i := 0; i < routines; i++ {
go func(id int) {
ch <- insertUser(id)
버퍼 있는 채널을 쓴 덕에 두번째 입력을 시작하기 위해 기다릴 필요가 없다. 첫번째 신호 보내기는 즉시 끝난다.
ch <- insertTrans(id)
}(i)
}
입력이 끝나면 입력 결과를 처리한다.
for waitInserts > 0 {
고루틴에서 오는 응답을 기다린다. 이건 신호 받기다. 한번에 결과 하나씩을 받고 waitInserts을 0이 될 때까지 줄일 것이다.
r := <-ch
결과를 보여준다.
log.Printf("N: %d ID: %d OP: %s ERR: %v", waitInserts, r.id, r.op, r.err)
waitInserts를 줄이고 일이 끝났는지 확인한다.
waitInserts--
}
log.Println("Inserts Complete")
}
insertUser는 데이터베이스 작업을 흉내낸다.
func insertUser(id int) result {
r := result{
id: id,
op: fmt.Sprintf("insert USERS value (%d)", id),
}
입력이 실패했는지 아닌지 무작위로 결정한다.
if rand.Intn(10) == 0 {
r.err = fmt.Errorf("Unable to insert %d into USER table", id)
}
return r
}
insertTrans도 데이터베이스 작업을 흉내낸다.
func insertTrans(id int) result {
r := result{
id: id,
op: fmt.Sprintf("insert TRANS value (%d)", id),
}
입력이 실패했는지 아닌지 무작위로 결정한다.
if rand.Intn(10) == 0 {
r.err = fmt.Errorf("Unable to insert %d into USER table", id)
}
return r
}
2020/08/24 18:18:19 N: 20 ID: 0 OP: insert USERS value (0) ERR: <nil>
2020/08/24 18:18:19 N: 19 ID: 0 OP: insert TRANS value (0) ERR: <nil>
2020/08/24 18:18:19 N: 18 ID: 1 OP: insert USERS value (1) ERR: <nil>
2020/08/24 18:18:19 N: 17 ID: 1 OP: insert TRANS value (1) ERR: <nil>
2020/08/24 18:18:19 N: 16 ID: 2 OP: insert USERS value (2) ERR: <nil>
2020/08/24 18:18:19 N: 15 ID: 2 OP: insert TRANS value (2) ERR: Unable to insert 2 into USER table
2020/08/24 18:18:19 N: 14 ID: 3 OP: insert USERS value (3) ERR: Unable to insert 3 into USER table
2020/08/24 18:18:19 N: 13 ID: 3 OP: insert TRANS value (3) ERR: <nil>
2020/08/24 18:18:19 N: 12 ID: 4 OP: insert USERS value (4) ERR: <nil>
2020/08/24 18:18:19 N: 11 ID: 4 OP: insert TRANS value (4) ERR: <nil>
2020/08/24 18:18:19 N: 10 ID: 5 OP: insert USERS value (5) ERR: <nil>
2020/08/24 18:18:19 N: 9 ID: 5 OP: insert TRANS value (5) ERR: <nil>
2020/08/24 18:18:19 N: 8 ID: 6 OP: insert USERS value (6) ERR: <nil>
2020/08/24 18:18:19 N: 7 ID: 6 OP: insert TRANS value (6) ERR: <nil>
2020/08/24 18:18:19 N: 6 ID: 7 OP: insert USERS value (7) ERR: <nil>
2020/08/24 18:18:19 N: 5 ID: 7 OP: insert TRANS value (7) ERR: Unable to insert 7 into USER table
2020/08/24 18:18:19 N: 4 ID: 8 OP: insert USERS value (8) ERR: <nil>
2020/08/24 18:18:19 N: 3 ID: 8 OP: insert TRANS value (8) ERR: <nil>
2020/08/24 18:18:19 N: 2 ID: 9 OP: insert USERS value (9) ERR: <nil>
2020/08/24 18:18:19 N: 1 ID: 9 OP: insert TRANS value (9) ERR: <nil>
2020/08/24 18:18:19 Inserts Complete
다음 예시 프로그램은 채널을 써서 프로그램이 도는 시간을 모니터링하고 너무 오래 돌면 종료시키는 방식을 보여준다.
package main
import (
"errors"
"log"
"os"
"os/signal"
"time"
)
프로그램이 일을 끝내기까지 3초의 시간을 주자.
const timeoutSeconds = 3 * time.Second
우리는 채널 4개를 쓸 것이다. 버퍼 없는 채널 3개와 버퍼 있는 채널 1개를 쓴다.
var (
sigChan은 운영 체제 신호를 받는다. 이 변수를 써서 프로그램을 깔끔하게 종료시키기 위해 Ctrl-C을 보낼 것이다.
sigChan = make(chan os.Signal, 1)
timeout은 프로그램이 돌 수 있는 시간을 제한한다. 이 채널에서 신호를 받는 일이 없었으면 좋겠다. 이 채널에서 신호를 받으면 뭔가 안 좋은 일이 생겼다는 뜻이고, 시간 초과가 일어나고, 프로그램을 종료시켜야 하기 때문이다.
timeout = time.After(timeoutSeconds)
complete는 처리가 끝났다는 것을 알리기 위해 쓴다. 이 채널이 우리가 신호를 받고 싶은 채널이다. 고루틴이 작업을 끝냈을 때 complete 채널을 통해 우리에게 신호를 줄 것이다. 어떤 에러가 일어났는지도 이 채널을 통해 알려줄 것이다.
complete = make(chan error)
shutdown은 시스템 전체 알림을 주기 위해 쓴다.
shutdown = make(chan struct{})
)
func main() {
log.Println("Starting Process")
우리는 인터럽트(interrupt) 관련 신호를 전부 받으려고 한다. signal 패키지에 있는 Notify 함수를 쓰면서 sigChan을 파라미터로 넘겨줄 것이다. 이는 sigChan 채널에게 os.Interrupt와 관련 있는 어떤 신호든지 보이면 우리에게 데이터 신호를 보내라고 말하는 것이다. 이 API에서 중요한 점은 우리가 신호를 받을 준비가 되어 있을 때까지 기다리지 않는다는 점이다. 우리가 신호를 받지 못한다면 신호는 그냥 바닥에 버려질 것이다. 여기서 1개짜리 버퍼가 있는 채널을 쓰는 이유가 바로 이것이다. 이것이 적어도 신호 1개를 받는 것을 보장받는 방법이다. 이 신호에 맞추어 행동할 준비가 되어 있을 때 우리는 신호를 받고 행동할 것이다.
signal.Notify(sigChan, os.Interrupt)
프로세스를 시작한다.
log.Println("Launching Processors")
아래 고루틴이 예컨대 이미지 처리 같은 처리를 할 것이다.
go processor(complete)
여기 있는 메인 고루틴은 이벤트 루프 안에 있고 프로그램이 종료될 때까지 무한히 루프를 돌 것이다. 고르기(select)에는 세가지 케이스가 있는데 이는 우리가 신호를 받으려고 하는 채널이 동시에 3개 있다는 뜻이다. sigChan과 timeout, complete가 있다.
ControlLoop:
for {
select {
case <-sigChan:
운영 체제에서 보낸 인터럽트(interrupt) 이벤트 신호이다.
log.Println("OS INTERRUPT")
프로세서에게 종료하라는 신호를 보내기 위해 채널을 닫는다.
close(shutdown)
이런 이벤트를 더이상 처리하지 않기 위해 채널을 nil로 설정한다.
닫힌 채널에 신호를 계속 보내려고 하면 패닉(panic)이 일어날 것이다. 닫힌 채널에서 신호를 받으려고 하면 즉시 데이터 없는 신호를 돌려받는다. nil 채널에서 신호를 받으려고 하면 영원히 막힐(block) 것이다. 신호 보내기도 비슷하다. 이렇게 하는 이유가 뭘까?
우리는 유저가 Ctrl C를 누르고 있거나 Ctrl C을 여러번 누르기를 바라지 않는다. 만약 유저가 그렇게 한다면 우리는 그 신호를 처리하고 close를 여러번 호출해야 한다. 이미 닫힌 채널에 close를 호출하면 코드는 패닉한다. 그러므로, 이런 상황에 놓이지 않기 위해 채널을 nil로 설정한다.
sigChan = nil
case <-timeout:
시간을 너무 많이 썼다. 어플리케이션을 종료시키자.
log.Println("Timeout - Killing Program")
os.Exit will terminate the program immediately.
os.Exit(1)
case err := <-complete:
아래는 주어진 시간 내에 완료된 모든 것이다.
log.Printf("Task Completed: Error[%s]", err)
여기서 우리는 레이블(label) break를 쓴다. case가 break할 수 있고 for가 break할 수 있게 하기 위해 for 루프의 맨 위에 레이블을 놓는다.
break ControlLoop
}
}
log.Println("Process Ended")
}
processor는 프로그램의 메인 로직을 담당한다. 이 파라미터에는 재밌는 부분이 있다. chan 키워드 오른쪽에 화살표가 있다. 이것은 채널이 신호 보내기 전용이라는 뜻이다. 이 채널에서 신호를 받으려고 하면 컴파일러는 에러를 뱉을 것이다.
func processor(complete chan<- error) {
log.Println("Processor - Starting")
어떤 에러가 일어나든 에러를 저장할 변수를 만든다. 클로저(closure)를 써서 defer 함수에 넘겨진다.
var err error
함수가 어떻게 끝났는지 상관없이 채널에 신호를 보내기 위해 신호 보내기를 지연(defer)시킨다. 이는 고루틴에서 보았던 것처럼 익명(anonymous) 함수 호출이다. 하지만 여기서 우리는 키워드 defer를 쓴다.
이 함수를 실행시키고 싶지만 processor가 끝난 후에 실행시키고 싶다. 이 방식은 함수를 호출한 쪽에 컨트롤이 넘어가기 전에 정해진 일이 확실히 일어난다는 보장을 해준다.
또한 defer는 패닉을 멈출 수 있는 유일한 방법이다. 안 좋은 일, 예컨대 이미지 라이브러리가 터지는 일이 생긴다면 코드 전체에 걸쳐 패닉 상황을 일으킬 것이다. 이 경우에 우리는 패닉에서 회복(recover)하고, 패닉을 멈추고, 종료를 컨트롤 하고 싶다.
defer func() {
어떤 패닉 상황이든 잡아낸다.
if r := recover(); r != nil {
log.Println("Processor - Panic", r)
}
고루틴에 종료해야 한다고 알려준다.
complete <- err
}()
작업을 수행한다.
err = doWork()
log.Println("Processor - Completed")
}
doWork는 작업을 흉내낸다. 모든 호출 사이에 우리는 checkShutdown을 호출한다. 모든 작업을 마친 후에는 다음 질문을 한다. "종료하라는 말을 들은 적이 있는가?" 이것을 아는 유일한 방법은 shutdown 채널이 닫혔는지 보는 것이다. shutdown 채널이 닫혔는지 아는 유일한 방법은 그 채널에서 신호 받기를 해보는 것이다. 닫히지 않은 채널에서 신호 받기를 하면 막힐(block) 것이다. 하지만 기본 케이스(default case)가 우리를 구해줄 것이다.
func doWork() error {
log.Println("Processor - Task 1")
time.Sleep(2 * time.Second)
if checkShutdown() {
return errors.New("Early Shutdown")
}
log.Println("Processor - Task 2")
time.Sleep(1 * time.Second)
if checkShutdown() {
return errors.New("Early Shutdown")
}
log.Println("Processor - Task 3")
time.Sleep(1 * time.Second)
return nil
}
checkShutdown은 종료 플래그를 확인해서 처리를 중단하도록 요청받은 적이 있는지 결정한다.
func checkShutdown() bool {
select {
case <-shutdown:
여기서 우리는 깔끔하게 프로그램을 종료하도록 요청받았다.
log.Println("checkShutdown - Shutdown Early")
return true
default:
shutdown 채널이 닫히지 않았다면 정상적인 처리 중이라고 가정한다.
return false
}
}
프로그램을 실행시킬 때 시간 제한을 3초로 설정했기 때문에 시간 초과가 일어나고 종료될 것이다.
2020/08/24 18:31:27 Starting Process
2020/08/24 18:31:27 Launching Processors
2020/08/24 18:31:27 Processor - Starting
2020/08/24 18:31:27 Processor - Task 1
2020/08/24 18:31:29 Processor - Task 2
2020/08/24 18:31:30 Timeout - Killing Program
exit status 1
프로그램이 돌고 있을 때 Ctrl C을 누르면 OS INTERRUPT라고 뜨고 프로그램은 일찍 종료한다.
2020/08/24 18:21:02 Starting Process
2020/08/24 18:21:02 Launching Processors
2020/08/24 18:21:02 Processor - Starting
2020/08/24 18:21:02 Processor - Task 1
^C2020/08/24 18:21:03 OS INTERRUPT
2020/08/24 18:21:04 checkShutdown - Shutdown Early
2020/08/24 18:21:04 Processor - Completed
2020/08/24 18:21:04 Task Completed: Error[Early Shutdown]
2020/08/24 18:21:04 Process Ended
Ctrt \을 눌러서 종료 신호를 보내면 모든 고루틴의 전체 스택 트레이스(stack trace)를 받을 수 있다.
2020/08/24 18:31:44 Starting Process
2020/08/24 18:31:44 Launching Processors
2020/08/24 18:31:44 Processor - Starting
2020/08/24 18:31:44 Processor - Task 1
2020/08/24 18:31:46 Processor - Task 2
^\SIGQUIT: quit
PC=0x7fff70c3e882 m=0 sigcode=0
goroutine 0 [idle]:
runtime.pthread_cond_wait(0x12201e8, 0x12201a8, 0x7ffe00000000)
/usr/local/go/src/runtime/sys_darwin.go:378 +0x39
runtime.semasleep(0xffffffffffffffff, 0x7ffeefbff678)
/usr/local/go/src/runtime/os_darwin.go:63 +0x85
runtime.notesleep(0x121ffa8)
/usr/local/go/src/runtime/lock_sema.go:173 +0xe0
runtime.stoplockedm()
/usr/local/go/src/runtime/proc.go:2068 +0x88
runtime.schedule()
/usr/local/go/src/runtime/proc.go:2469 +0x485
runtime.park_m(0xc00007cd80)
/usr/local/go/src/runtime/proc.go:2610 +0x9d
runtime.mcall(0x108ca06)
/usr/local/go/src/runtime/asm_amd64.s:318 +0x5b
goroutine 1 [select]:
main.main()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:6
7 +0x278
goroutine 19 [syscall]:
os/signal.signal_recv(0x108ebb1)
/usr/local/go/src/runtime/sigqueue.go:144 +0x96
os/signal.loop()
/usr/local/go/src/os/signal/signal_unix.go:23 +0x30
created by os/signal.init.0
/usr/local/go/src/os/signal/signal_unix.go:29 +0x4f
goroutine 5 [sleep]:
runtime.goparkunlock(...)
/usr/local/go/src/runtime/proc.go:310
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:105 +0x157
main.doWork(0xc000054768, 0x1)
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:157 +0x14a
main.processor(0xc000096060)
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:138 +0xbc
created by main.main
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:58 +0x160
rax 0x104
rbx 0x2
rcx 0x7ffeefbff498
rdx 0x200
rdi 0x12201e8
rsi 0x20100000300
rbp 0x7ffeefbff530
rsp 0x7ffeefbff498
r8 0x0
r9 0xa0
r10 0x0
r11 0x202
r12 0x12201e8
r13 0x16
r14 0x20100000300
r15 0x10863dc0
rip 0x7fff70c3e882
rflags 0x203
cs 0x7
fs 0x0
gs 0x0
exit status 2