summaryrefslogtreecommitdiff
path: root/doc/talks/io2010/balance.go
diff options
context:
space:
mode:
Diffstat (limited to 'doc/talks/io2010/balance.go')
-rw-r--r--doc/talks/io2010/balance.go168
1 files changed, 168 insertions, 0 deletions
diff --git a/doc/talks/io2010/balance.go b/doc/talks/io2010/balance.go
new file mode 100644
index 000000000..b01f7468c
--- /dev/null
+++ b/doc/talks/io2010/balance.go
@@ -0,0 +1,168 @@
+// Copyright 2010 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "container/heap"
+ "flag"
+ "fmt"
+ "rand"
+ "time"
+)
+
+const nRequester = 100
+const nWorker = 10
+
+var roundRobin = flag.Bool("r", false, "use round-robin scheduling")
+
+// Simulation of some work: just sleep for a while and report how long.
+func op() int {
+ n := rand.Int63n(1e9)
+ time.Sleep(nWorker * n)
+ return int(n)
+}
+
+type Request struct {
+ fn func() int
+ c chan int
+}
+
+func requester(work chan Request) {
+ c := make(chan int)
+ for {
+ time.Sleep(rand.Int63n(nWorker * 2e9))
+ work <- Request{op, c}
+ <-c
+ }
+}
+
+type Worker struct {
+ i int
+ requests chan Request
+ pending int
+}
+
+func (w *Worker) work(done chan *Worker) {
+ for {
+ req := <-w.requests
+ req.c <- req.fn()
+ done <- w
+ }
+}
+
+type Pool []*Worker
+
+func (p Pool) Len() int { return len(p) }
+
+func (p Pool) Less(i, j int) bool {
+ return p[i].pending < p[j].pending
+}
+
+func (p *Pool) Swap(i, j int) {
+ a := *p
+ a[i], a[j] = a[j], a[i]
+ a[i].i = i
+ a[j].i = j
+}
+
+func (p *Pool) Push(x interface{}) {
+ a := *p
+ n := len(a)
+ a = a[0 : n+1]
+ w := x.(*Worker)
+ a[n] = w
+ w.i = n
+ *p = a
+}
+
+func (p *Pool) Pop() interface{} {
+ a := *p
+ *p = a[0 : len(a)-1]
+ w := a[len(a)-1]
+ w.i = -1 // for safety
+ return w
+}
+
+type Balancer struct {
+ pool Pool
+ done chan *Worker
+ i int
+}
+
+func NewBalancer() *Balancer {
+ done := make(chan *Worker, nWorker)
+ b := &Balancer{make(Pool, 0, nWorker), done, 0}
+ for i := 0; i < nWorker; i++ {
+ w := &Worker{requests: make(chan Request, nRequester)}
+ heap.Push(&b.pool, w)
+ go w.work(b.done)
+ }
+ return b
+}
+
+func (b *Balancer) balance(work chan Request) {
+ for {
+ select {
+ case req := <-work:
+ b.dispatch(req)
+ case w := <-b.done:
+ b.completed(w)
+ }
+ b.print()
+ }
+}
+
+func (b *Balancer) print() {
+ sum := 0
+ sumsq := 0
+ for _, w := range b.pool {
+ fmt.Printf("%d ", w.pending)
+ sum += w.pending
+ sumsq += w.pending * w.pending
+ }
+ avg := float64(sum) / float64(len(b.pool))
+ variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
+ fmt.Printf(" %.2f %.2f\n", avg, variance)
+}
+
+func (b *Balancer) dispatch(req Request) {
+ if *roundRobin {
+ w := b.pool[b.i]
+ w.requests <- req
+ w.pending++
+ b.i++
+ if b.i >= len(b.pool) {
+ b.i = 0
+ }
+ return
+ }
+
+ w := heap.Pop(&b.pool).(*Worker)
+ w.requests <- req
+ w.pending++
+ // fmt.Printf("started %p; now %d\n", w, w.pending)
+ heap.Push(&b.pool, w)
+}
+
+func (b *Balancer) completed(w *Worker) {
+ if *roundRobin {
+ w.pending--
+ return
+ }
+
+ w.pending--
+ // fmt.Printf("finished %p; now %d\n", w, w.pending)
+ heap.Remove(&b.pool, w.i)
+ heap.Push(&b.pool, w)
+}
+
+func main() {
+ flag.Parse()
+ work := make(chan Request)
+ for i := 0; i < nRequester; i++ {
+ go requester(work)
+ }
+ NewBalancer().balance(work)
+}