반응형

https://gosudaweb.gitbooks.io/effective-go-in-korean/content/concurrency.html

 

동시성 · Effective Go in Korean

results matching "" No results matching ""

gosudaweb.gitbooks.io

한국어 번역 사이트가 있어서 해당 경로를 참조하시길 바랍니다. 

 goroutines과 channels를 공부하기 위해서 golang 사이트 effective go의 concurrency에 대한 부분을 살펴봅니다. 

https://golang.org/doc/effective_go.html#concurrency

 

Effective Go - The Go Programming Language

Effective Go Introduction Go is a new language. Although it borrows ideas from existing languages, it has unusual properties that make effective Go programs different in character from programs written in its relatives. A straightforward translation of a C

golang.org

Share by communicating

 동시성 프로그래밍은 광범위한 주제이므로 여기에서는 Go에 한정된 중요한 것들에 대해서만 지면을 할애한다.

 많은 환경에서 동시성 프로그래밍은 공유 변수에 대한 적절한 접근을 구현하는 것이 필요하기 때문에 어렵습니다.

 go는 공유 자원이 채널을 통해서 전달되는 다른 접근 방식을 권장합니다.

 별도의 수행 스레드가 능동적으로 공유하지 않습니다. 

 오직 하나의 goroutines만 주어진 시간에 값에 접근할 수 있습니다.

 설계에 의해서 Data race는 발생하지 않습니다. 이러한 사고방식을 권장하기 위해서 슬로건으로 줄였습니다. 

 "메모리를 공유해서 통신하지 마십시오; 대신에 통신으로 통해서 메모리를 공유하세요"

 이 접근방식은 너무 멀리 갈 수 있습니다.

 참조 카운트는 예를 들어 정수 변수 주변에서 mutex를 두는 것이 가장 좋을 수 있습니다.

 그러나 높은 수준의 접근방식(복잡한 공유객체)은 채널을 사용해서 접근을 관리하는 것이 명확하게 올바른 프로그램을

 쉽게 작성 할 수 있습니다.

 이 모델에 대해서 생각하는 한 가지 방법은 하나의 cpu에서 실행되는 전형적인 단일 스레드 프로그램을

 고려하는 것입니다.

 이것은 동기화가 필요 하지 않습니다. 다른 인스턴스를 실행 하십시오. 이것 또한 동기화가 필요하지 않습니다. 

 이제 두 인스턴스들이 통신을 시킵니다. 만약에 통신이 동기 장치여도 여전치 동기화가 필요하지 않습니다. 

 예를 들면 Unix 파이프라인은 이 모델에 완벽하게 일치 합니다. 

 Go의 동시성 접근은 Hoare의 CSP(Communicating Sequential Processes) 안에서 유래했지만 

 type-safe generalization of Unix pipes로 볼 수도 있습니다. (CSP에 대해서 찾아보기)

 ( gouroutines과 channel은 이러한 방식으로 소통한다는 뜻인 것 같습니다.)

Goroutines

 스레드, 코루틴, 프로세스 등의 기존 용어로 설명할 수 없기 때문에 goroutines이라고 부릅니다.

 goroutines은 같은 주소공간 안에서 다른 goroutines과 동시에 실행되는 기능에 대해서 간단한 모델을 가집니다. 

 goroutines은 가볍고 stack 공간 할당보다 약간의 비용이 더 듭니다.

 그리고 스택이 작게 시작해서 저렴하고 필요에 따라서 힙으로 부터 할당해서 확장할 수 있습니다. 

 goroutines은 여러개의 os의 스레드에 의해서 다중처리가 되므로

 만약에 하나 goroutines이 I/O를 대기와 같은 이유로 Block이 되더라도

 다른 goroutines들은 멈추지 않고 수행이 됩니다. 

 이러한 설계는 스레드의 생성 및 관리에 대한 복잡성을 감춥니다.(사용자는 신경 쓰지 않아도 됨)

 새로운 goroutines을 실행시키기 위해서는 go 키워드를 접두어 붙여 함수나 메서드를 호출합니다.

 호출이 완료될 때 goroutines은 자동으로 종료됩니다.

 (Unix 쉘에서 백그라운드에서 실행시키기 위한 &과 비슷한 효과입니다.)

// run list.Sort concurrently; 이것을 기다리지 마라
go list.Sort()  

 함수 리터럴은 goroutines 호출에 유용 할 수 있습니다.

func Announce(message string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // Note the parentheses - must call the function.
}

 go안에서 함수 리터럴은 클로져입니다.

 : 실행은 함수가 참조하는 변수가 활성화 되어 있는 동안 생존할 수 있게 만들어 줍니다.

 이 예제는 함수가 완료 신호를 보낼 방법이 없기 때문에 실용적이지 않습니다. 신호를 보내려면 채널이 필요합니다. 

 (채널에 대해서는 다음 글에서 설명 드립니다.)

Channels

 map과 같이 channels은 make로 할당되며 그 결과 값은 데이터 구조체의 참조합니다. 

 만약 옵션으로 정수 값을 제공하면 channel의 버퍼 사이즈를 세팅합니다. 

 channel의 기본값은 zero이며 unbuffered이며 동기적 채널입니다. 

ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0)         // unbuffered channel of integers
cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

 unbuffered channels은 알려진 상태 안에서 두 계산(goroutines)의 값의 교환(동기화를 보장)하는 통신으로

 결합됩니다. 

 channels을 사용하는 아주 좋은 방식이 있습니다. 이전 섹션에서 우리는 백그라운드 안에서 정렬을 수행했습니다. 

 channels은 수행 중인 goroutines이 정렬의 완료를 기다리게 할 수 있습니다. 

c := make(chan int)  // channel 할당
// goroutines안에서 정렬을 시작합니다. 
// 정렬이 완료되면 채널은 신호를 보냅니다.
go func() {
    list.Sort()
    c <- 1  // 신호 전달; 값은 중요하지 않습니다.
}()
doSomethingForAWhile()
<-c   // 정렬이 끝나기는 것을 기다립니다.; 전달받은 값은 버립니다.

 수신자(Receiver)는 언제나 데이터를 전달받을 때까지 Block 됩니다. 만약 채널이 unbuffered라면 송신자(Sender)는

 수신자가 값을 받을 때까지 Block됩니다. 만약 채널이 Buffered라면 송신자는 값이 버퍼에 복사될 때까지만 차단합니다.

 만약에 버퍼가 가득 찼다면 송신자는 어떤 수신자가 버퍼에서 값을 수신할 때까지 기다린다는 것을 의미합니다. 

 buffer 채널은 예를 들면 처리량을 제한하기 위해서 semaphore(락 획득 개수를 가진 mutex와 비슷)처럼

 사용될 수 있습니다.

 예제에서 Handle을 통해서 들어오는 요청 전달되면 채널에 값을 보내고 요청을 처리하고 채널에서 값을 받아서

 다음 소비자를 위한 "Semaphore"를 준비합니다. 채널의 버퍼 용량은 동시 처리할 호출 수를 제한합니다.

// MaxOutstanding 동시 수행 제한 수
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // 활성화된 버퍼가 비워질때까지 기다립니다.
    process(r)  // 시간이 오래걸릴수 있습니다.
    <-sem       // 완료; 다음 요청을 실행 할 수 있도록 설정.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // 핸들이 끝날때까지 기다리지 마세요.
    }
}

 한 번에 MaxOutstanding 숫자의 handle을 수행하면 기존의 handle 중 하나가 끝나고 버퍼로부터 수신할 때까지

 가득 채워진 버퍼로의 전송을 차단합니다.

 이 디자인은 문제가 있습니다. Serve는 들어오는 요청마다 새로운 goroutines을 만들지만 언제나

 MaxOutstanding 개수의goroutines만을 수행할 수 있습니다. 결과적으로 이 프로그램은 만약 요청이 너무 빠르면 

 무제한의 자원을 소비할 수 있습니다.

 우리는 goroutines 생성하는 통로로 Serve를 변경하여 문제를 해결할 수 있습니다.

 여기 확실한 해결책이 있습니다. 하지만 버그가 있으니 주의하십니다.(다음 예제에서 해결될)

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Buggy; 아래 설명을 참조하세요
            <-sem
        }()
    }
}

 버그는 go의 for 루프 안에서 루프 변수는 각 반복에 재사용합니다. 그래서 "req" 변수는 모든 goroutines에서

 공유됩니다. 이것은 우리가 원하는 것이 아닙니다. 우리는 "req"가 각 goroutine마다 고유하게 만들어지길 원합니다.

 그것을 수행할 한 가지 방법이 있습니다. goroutines의 클로져에 대한 인수로 "req"를 값으로 전달합니다. 

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

 클로져의 선언 및 실행 방법의 차이점을 보려면 이 버전과 이전 버전을 비교하세요 

 또 다른 해결 방법은 같은 이름의 새로운 변수를 생성하는 것입니다. 아래 예제와 같이

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // goroutine을 위한 새로운 req 생성
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}

 이렇게 작성하는 것이 이상하게 보일 수도 있습니다.

req := req

 그러나 go안에서 해당 구현은 자연스럽고 합법적입니다. 같은 이름을 가진 새로운 버전의 변수를 얻습니다. 

 고의적으로 루프 변수를 로컬변수로 변화시키지만 각 goroutine은 고유한 변수를 전달 받습니다. 

 서버 작성의 일반적인 문제로 돌아가서 자원을 잘 관리하는 또 다른 접근 방식은 채널의 모든 요청을 처리하는

 goroutines의 숫자를 고정된 크기로 시작하는 것입니다. 

 goroutines의 수는 동시에 호출되는 process의 수를 제한합니다. 

 이 Serve함수는 종료 메시지를 전달하는 채널 또한 허용합니다. goroutines이 시작 후에는 해당 채널로부터

 메시지를 받기 위해 Block됩니다. 

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // 종료 될 때까지 기다립니다. 
}

Channels of channels

 go의 가장 중요한 특징중 하나는 channels이 다른 것들과 마찬가지로 할당과 전달이 가능한

 일급 객체(first class object)라는 것입니다.

 이 속성의 일반적인 사용은 parallel demultiplexing을 안전하게 구현하는 것입니다. 

 이전 섹션 예제에서 handle함수는 요청에 대한 이상적인 처리기였지만 처리되는 타입에 대해서는

 정의하지 않았습니다. 만약 타입에 요청에 대한 응답할 채널이 포함되어 있다면 각 클라이언트들은 응답을 

 자신만의 경로로 제공할 수 있습니다. 이것은 Request 타입에 대한 정의입니다

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

 클라이언트는 함수와 인자뿐만 아니라 응답을 수신 할 채널을 Request 객체안에 제공할 수 있습니다.

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// request 전송
clientRequests <- request
// response 기다림
fmt.Printf("answer: %d\n", <-request.resultChan)

 서버쪽에서 Handler함수만 변경됩니다.

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

 이것을 현실적으로 적용하기 위해서는 해야 할 일이 많습니다. 그러나 이 코드는 rate-limited, 병렬,

 Non blocking RPC시스템의 프레임 워크이며 mutex가 없습니다.

Parallelization

 이러한 아이디어의 또 다른 적용은 여러 CPU 코어 사이에서 계산을 병렬화하는 것입니다. 

 만약 계산을 독립적으로 실행 할 수 있는 작은 조각으로 나눌 수 있다면 각 조각들이 완료 될때 신호를 보내는 채널과

 함께 병렬로 수행 될 수 있습니다. 

 vecter의 items들이 수행에 비싼 비용을 가진다고 가정하고 이상적인 예제와 같이 각 item의 작업값은 독립적입니다.

type Vector []float64

// Apply the operation to v[i], v[i+1] ... up to v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // 이 부분 작업이 끝나면 신호 보냄
}

CPU당 하나씩 루프에서 독립적인 부분 작업을 시작합니다. 그것들은 순서에 관계없이 끝나지만 중요하지 않습니다.

모든 goroutines이 시작한 후에 채널을 통해서 완료신호를 세면 됩니다.

const numCPU = 4 // number of CPU cores

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)  // 버퍼링은 선택이지만 효율적입니다.
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // Drain the channel.
    for i := 0; i < numCPU; i++ {
        <-c    // wait for one task to complete
    }
    // All done.
}

 CPU 수에 대한 상수값을 생성하는 것 대신에 런타임에 적절한 값을 물어볼 수 있습니다.

 rumtime.NumCPU 함수는  장비 안의 하드웨어 CPU갯수를 반환하므로 아래와 같이 작성할 수 있습니다.

var numCPU = runtime.NumCPU()

 또한 runtime.GOMAXPROC 함수는 Go프로그램에서 동시에 수행할 수 있는 사용자 지정 코어의 수를 보고합니다.

 이것의 기본값은 runtime.NumCPU이지만 정수와 함께 호출하거나 비슷한 이름의 쉘 환경변수를 설정해서 재정의할 수

 있습니다.

 0을 인자로 호출하면 요청이 됩니다. 그러므로 만약에 사용자 자원 요청을 존중하려면 아래와 같이 작성할 수있습니다. 

var numCPU = runtime.GOMAXPROCS(0)

 parallelism(여러개의 CPU에서 효율성을 위해서 병렬로 계산을 수행하는 것 )과

 concurrency(프로그램을 독립적으로 실행하는 구성요소로 구조화 하는것 ) 아이디어를 혼동하지 마세요.

 go의 concurrency기능으로 일부 문제를 병렬 계산으로 쉽게 구성할 수 있지만 Go는 concurrent 언어이며

 parallel 언어는 아니고 모든 parallelization문제가 go에 맞는 건 아닙니다. 

 차이점에 대한 내용은 이 블로그를 참조했습니다. this blog post.

A leaky buffer

 동시성 프로그래밍 도구를 사용하면 비 동시성 아이디어를 쉽게 표현 할 수 있습니다. 

 이것은 RPC 패키지에서 추상화된 예제입니다.

 어떤 자원, client goroutine 루프는 아마도 네트워크 로부터 데이터를 전달 받고 있다.

 버퍼의 할당과 해제를 회피하기 위해서 free list를 유지하고 그것을 위해서 buffered된 channel을 사용합니다. 

 만약에 channel이 비어 있다면 새로운 buffer를 할당합니다. 메시지 buffer가 준비되면 serverChan을 통해서 

 서버로 전송됩니다. 

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // Grab a buffer if available; allocate if not.
        select {
        case b = <-freeList:
            // Got one; nothing more to do.
        default:
            // None free, so allocate a new one.
            b = new(Buffer)
        }
        load(b)              // Read next message from the net.
        serverChan <- b      // Send to server.
    }
}

server의 루프는 클라이언트로부터 각각 메시지를 수신해서 처리하고 free list에 buffer를 반환합니다. 

func server() {
    for {
        b := <-serverChan    // 작업을 기다립니다. 
        process(b)
        // 공간이 있으면 buffer 재사용
        select {
        case freeList <- b:
            // free list에 Buffer 넣음; 아무것도 할일이 없다
        default:
            // Free list 가득참, just carry on.
        }
    }
}

 클라이언트는 freeList로부터 buffer를 회수하는 것을 시도합니다. 만약 사용가능한 것이 없다면 새로운 buffer를

 할당합니다. server는 free list가 가득 차지 않는 한 freeList로 b를 넣고 send합니다. 

 어떤 경우에는 버퍼를 가비지 수집기에서 회수하기 위해서 버립니다.

 (다른 경우가 준비되지 않으면 select문의 기본 구문이 수행되기 때문에 select는 절대 block되지 않습니다.)

 이 구현은 가비지 수집기와 buffered channels을 사용해서 단 몇줄만으로 leaky bucket free list을 만듭니다.

반응형

+ Recent posts