Skip to content

Latest commit

 

History

History
276 lines (234 loc) · 23.5 KB

13. Конкурентність.md

File metadata and controls

276 lines (234 loc) · 23.5 KB

Зміст

Конкурентність

Розподіл за повідомленнями

Паралельне програмування є великою темою й тут є місце лише для деяких специфічних для Go моментів.

Паралельне програмування у багатьох середовищах ускладнюється тонкощами, необхідними для реалізації коректного доступу до спільних змінних. Go заохочує інший підхід, при якому спільні значення передаються по каналах і, фактично, ніколи активно не використовуються окремими потоками виконання. Тільки одна підпрограма має доступ до значення в будь-який момент часу. Перегони даних не можуть відбуватися за замовчуванням. Щоб заохотити такий спосіб мислення, ми перетворили його на слоган:

Не спілкуйтеся, ділячись пам'яттю; натомість, діліться пам'яттю, спілкуючись.

Такий підхід може зайти надто далеко. Наприклад, підрахунок посилань найкраще здійснювати за допомогою м'ютексу навколо цілочисельної змінної. Але як високорівневий підхід, використання каналів для керування доступом полегшує написання зрозумілих, коректних програм.

Один зі способів зрозуміти цю модель — розглянути типову однопотокову програму, що виконується на одному процесорі. Вона не потребує примітивів синхронізації. Тепер запустіть інший такий екземпляр; він також не потребує синхронізації. Тепер дозвольте цим двом взаємодіяти; якщо взаємодія відбувається за допомогою синхронізатора, іншої синхронізації все одно не потрібно. Конвеєри (pipelines) Unix, наприклад, ідеально підходять для цієї моделі. Хоча підхід Go до конкурентності бере свій початок у комунікаційних послідовних процесах (англ. Communicating Sequential Processes — CSP) Хоара, його також можна розглядати як безпечне для типів узагальнення конвеєрів Unix.

Горутини

Вони називаються горутинами, тому що інші терміни — потоки, підпрограми, процеси й так далі — мають неточні конотації. Горутина має просту модель: це функція, що виконується конкурентно з іншими горутинами в тому ж адресному просторі. Вона легка, коштує трохи більше, ніж виділення місця в стеку. Стеки ростуть коштом виділення (і звільнення) пам'яті купи за потреби, тому вони дешеві.

Горутини мультиплексуються на декілька потоків ОС, тому якщо одна з них блокується, наприклад, під час очікування вводу/виводу, інші продовжують працювати. Їхній дизайн приховує багато складнощів у створенні та управлінні потоками.

Додайте до виклику функції або методу ключове слово go, щоб запустити виклик у новій горутині. Коли виклик завершиться, вона безшумно завершить роботу. (Ефект подібний до нотації & в командній оболонці Unix для запуску команди у фоновому режимі).

go list.Sort()  // запустити list.Sort конкурентно, не очікуючи на завершення виконання..

Функціональний літерал може бути зручним для виклику підпрограми.

func Announce(message string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }() // Зверніть увагу на дужки - функцію буде викликано.
}

У Go літерали функцій є закриттями: реалізація гарантує, що змінні, на які посилається функція, існують доти, доки вони активні.

Ці приклади не надто практичні, оскільки функції не мають способу сигналізувати про завершення виконання. Для цього нам потрібні канали.

Канали

Як і мапи, канали виділяються за допомогою make, а отримане значення діє як посилання на базову структуру даних. Якщо надається необов'язковий цілочисельний параметр, він задає розмір буфера для каналу. За замовчуванням він дорівнює нулю для небуферизованого або синхронного каналу.

ci := make(chan int)           // небуферизований канал цілих чисел
cj := make(chan int, 0)        // небуферизований канал цілих чисел
cs := make(chan *os.File, 100) // буферизований канал покажчиків на файли

Небуферизовані канали поєднують комунікацію — обмін значеннями — з синхронізацією, яка гарантує, що два обчислення (горутини) перебувають у відомому стані.

Існує багато гарних ідіом з використанням каналів. У попередньому розділі ми запустили сортування у фоновому режимі. Канал може дозволити програмі, яка запускає горутину, дочекатися завершення її виконання.

c := make(chan int) // Виділення каналу.
// Розпочати сортування в горутині; коли воно завершиться, подати сигнал на канал.
go func() {
    list.Sort()
    c <- 1 // Надіслати сигнал про завершення; значення не грає ролі.
}()
doSomethingForAWhile()
<-c // Чекати на завершення сортування; відкинути надіслане значення.

Одержувачі завжди блокуються, поки не отримають дані. Якщо канал без буфера, відправник блокується, поки одержувач не отримає значення. Якщо канал має буфер, відправник блокується тільки до тих пір, поки значення не буде скопійовано в буфер; якщо буфер переповнений, це означає очікування, поки якийсь одержувач не отримає значення.

Буферизований канал можна використовувати як семафор, наприклад, для обмеження пропускної здатності. У цьому прикладі вхідні запити передаються функції handle, яка надсилає значення в канал, обробляє запит, а потім отримує значення з каналу, щоб підготувати «семафор» для наступного споживача. Ємність буфера каналу обмежує кількість одночасних викликів функції process.

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1   // Чекати, поки активна черга не звільниться.
    process(r) // Може зайняти багато часу.
    <-sem      // Завершено; дозволити наступному запиту виконатись.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req) // Не чекати на завершення виконання handle.
    }
}

Коли MaxOutstanding обробників виконують process, будь-який інший процес буде блокувати спроби надсилання у заповнений буфер каналу, доки один з наявних обробників не завершить роботу і не отримає дані з буфера.

Однак, ця конструкція має проблему: Serve створює нову горутину для кожного вхідного запиту, хоча в будь-який момент може працювати лише MaxOutstanding з них. В результаті, програма може споживати необмежені ресурси, якщо запити надходять занадто швидко. Ми можемо усунути цей недолік, змінивши Serve на gate при створенні підпрограм. Це очевидне рішення, але будьте обережні, у ньому є баг, який ми згодом виправимо:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Містить помилку; див. пояснення нижче.
            <-sem
        }()
    }
}

Помилка полягає в тому, що в циклі for змінна циклу повторно використовується для кожної ітерації, тому змінна req є спільною для всіх горутин. Це не те, чого ми хочемо. Нам потрібно переконатися, що req є унікальною для кожної горутини. Ось один зі способів зробити це — передати значення req як аргумент для закриття в горутині:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

Порівняйте цю версію з попередньою, щоб побачити різницю в тому, як оголошується і виконується закриття. Іншим рішенням є створення нової змінної з тим самим іменем, як у цьому прикладі:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // Створити новий екземпляр req для горутини.
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}

Може здатися дивним писати

req := req

Проте, це цілком доречно та ідіоматично в Go. Ви отримаєте свіжу версію змінної з тим самим ім'ям, яка навмисно затінює змінну циклу локально, але є унікальною для кожної підпрограми.

Повертаючись до загальної проблеми написання сервера, ще один підхід, який добре управляє ресурсами, полягає в тому, щоб запустити фіксовану кількість горутин для handle, які читають всі дані з каналу запитів. Кількість процедур обмежує кількість одночасних викликів process. Ця функція Serve також приймає канал, на якому їй буде вказано вийти; після запуску процедур вона блокує отримання з цього каналу.

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // Запустити обробники
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit // Чекати, поки не буде подано сигнал.
}

Канали каналів

Однією з найважливіших властивостей Go є те, що канали — це звичайні змінні, які, як і будь-які інші, можна виділяти та передавати. Ця властивість часто використовується для реалізації безпечного паралельного демультиплексування.

У прикладі з попереднього розділу метод handle був ідеалізованим обробником запиту, але ми не визначили тип, який він обробляв. Якщо цей тип містить канал для відповіді, то кожен клієнт може надати свій власний шлях для неї. Ось схематичне визначення типу Request.

type Request struct {
    args       []int
    f          func([]int) int
    resultChan chan int
}

Клієнт надає функцію та її аргументи, а також канал всередині об'єкта запиту, по якому потрібно отримати відповідь.

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Надіслати запит.
clientRequests <- request
// Очікувати на відповідь.
fmt.Printf("answer: %d\n", <-request.resultChan)

На стороні сервера змінюється лише функція обробника.

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

Очевидно, що ще багато чого можна зробити, аби це було реалістичнішим, але цей код є фреймворком для паралельної системи RPC з обмеженою швидкістю, що не блокує, і тут немає жодного м'ютексу.

Паралелілзм

Іншим застосуванням цих ідей є розпаралелювання обчислень на декількох ядрах процесора. Якщо обчислення можна розбити на окремі частини, які можуть виконуватися незалежно, його можна розпаралелити, створивши канал для сигналізації про завершення кожної частини.

Припустимо, що нам потрібно виконати дорогу операцію над вектором елементів, і що вартість операції над кожним елементом є незалежною, як у цьому ідеалізованому прикладі.

type Vector []float64

// Застосувати операцію до v[i], v[i+1] ... аж до v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1 // сигнал, що ця частина завершена
}

Ми запускаємо фрагменти незалежно в циклі, по одному на кожне ядро процесора. Вони можуть завершуватися в будь-якому порядку, але це не має значення; ми просто рахуємо сигнали завершення, звільняючи канал після запуску всіх горутин.

const numCPU = 4 // Кількість ядер процесора.

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU) // Буферизація необов'язкова, але є розумним кроком.
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // Очищення каналу.
    for i := 0; i < numCPU; i++ {
        <-c // Чекати на завершення одного завдання.
    }
    // Все зроблено.
}

Замість того, щоб створювати константне значення для numCPU, ми можемо запитати у часу виконання, яке значення є відповідним. Функція runtime.NumCPU повертає кількість апаратних ядер процесора у машині, тому ми можемо написати

var numCPU = runtime.NumCPU()

Існує також функція runtime.GOMAXPROCS, яка повідомляє (або встановлює) вказану користувачем кількість ядер, на яких може одночасно виконуватися програма. За замовчуванням вона має значення runtime.NumCPU, але його можна перевизначити, встановивши однойменну змінну або викликавши функцію з додатним числом. Виклик функції з нульовим значенням просто запитує значення. Тому, якщо ми хочемо задовольнити запит користувача на отримання ресурсу, ми повинні написати

var numCPU = runtime.GOMAXPROCS(0)

Не плутайте ідеї конкурентності — структурування програми як незалежно виконуваних компонентів — і паралелізму — виконання обчислень паралельно для ефективності на декількох ядрах процесора. Хоча можливості конкурентності в Go дозволяють легко структурувати деякі проблеми як паралельні обчислення, Go є конкурентною мовою, а не паралельною, і не всі проблеми розпаралелювання підходять під модель Go. Для обговорення цієї різниці дивіться доповідь, на яку є посилання у цьому пості з блогу (англ.).

Поточний буфер

Інструменти конкурентного програмування можуть навіть полегшити вираження неконкурентних концептів. Ось приклад, абстрагований від пакета RPC. Клієнтська горутина циклічно отримує дані з деякого джерела, можливо, з мережі. Щоб уникнути виділення та звільнення буферів, вона зберігає вільний список і використовує буферизований канал для його представлення. Якщо канал порожній, виділяється новий буфер. Як тільки буфер повідомлення буде готовий, його буде надіслано на сервер за допомогою serverChan.

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // Взяти буфер, якщо він доступний; виділити новий, якщо ні.
        select {
        case b = <-freeList:
            // Отримано; більше нічого не треба робити.
        default:
            // Вільних буферів немає, тому виділити новий.
            b = new(Buffer)
        }
        load(b)         // Прочитати наступне повідомлення з мережі.
        serverChan <- b // Відправити серверу.
    }
}

Серверний цикл отримує кожне повідомлення від клієнта, обробляє його і повертає буфер у вільний список.

func server() {
    for {
        b := <-serverChan // Чекати на роботу.
        process(b)
        // Повторно використати буфер, якщо є місце.
        select {
        case freeList <- b:
            // Буфер у списку вільних; більше нічого не треба робити.
        default:
            // Список вільних буферів заповнений, продовжуємо.
        }
    }
}

Клієнт намагається отримати буфер з freeList; якщо його немає, він виділяє новий. Відправлення сервером запиту до freeList повертає b назад до вільного списку, якщо тільки список не заповнено, у цьому випадку буфер скидається, щоб його забрав збирач сміття. (Умова default в операторах select виконується, коли жоден інший випадок не задовільнений, що означає, що select ніколи не блокується). Ця реалізація будує список без дірявих відер всього за кілька рядків, покладаючись на буферизований канал і збирач сміття для ведення обліку.