Fan In/Out Pattern In Golang
Golang is absolutely gorgeous when it comes to concurrency. I was trying to implement fan in/out pattern in it. Following is the result.
package main
import "fmt"
func main() {
var numOfWorkers int
// How many workers do you want?
fmt.Printf("Enter the number of workers to spawn : ")
fmt.Scanf("%d\n", &numOfWorkers)
c := make(chan int)
outChanArr := make([]chan map[int]int, numOfWorkers)
done := make(chan bool)
collateChan := make(chan map[int]int)
// generate some values
go genFunc(c)
for s := 0; s < numOfWorkers; s++ {
// calculate the factorials
outChanArr[s] = fact(c, done, s)
}
// collate results from all channels
collateFunc(done, collateChan, outChanArr...)
go func() {
for s := 0; s < numOfWorkers; s++ {
<-done // syncronise the goroutines
}
close(collateChan)
}()
for i := range collateChan {
fmt.Println(i) // Print the result
}
}
func genFunc(c chan<- int) {
for i := 0; i < 100; i++ {
for j := 0; j < 20; j++ {
c <- j
}
}
close(c)
}
func fact(cin <-chan int, done chan bool, routineName int) chan map[int]int {
cout := make(chan map[int]int)
go func() {
counter := 0
for i := range cin {
out := 1
for j := i; j > 1; j-- {
out *= j
}
cout <- map[int]int{i: out}
counter++
}
fmt.Println("Goroutine fact ", routineName, "processed", counter, "items")
close(cout)
fmt.Println("Goroutine fact", routineName, "is finished")
}()
return cout
}
func collateFunc(done chan bool, collateChan chan map[int]int, c ...chan map[int]int) {
for idx, ci := range c {
go func(ci chan map[int]int, idx int) {
counter := 0
for i := range ci {
collateChan <- i
counter++
}
fmt.Println("Goroutine consume ", idx, "consumed", counter, "items")
done <- true
}(ci, idx)
}
}
As you can see here, in golang we don’t have to worry about locking if we use channels. Channels themselves act as
syncronization agent. On the other hand we can use something called Waitgroups
to achive the syncronization.
The first part is to specify the number of workers we want to crunch some numbers. numOfWorkers
stores that value.
genFunc
is the goroutine here that generates the dummy values, and pushes it on a channel c
. This task happens in
separate thread, as we have called the genFunc
with keyword go
. And the genFunc
is off and running.
go genFunc(c) // generate some values
We can consider c
as a conveyor belt on which a
little gopher called genFunc
is putting some stuff to be processed.
The for loop calls the fact
function numOfWorkers
times. It effectively spawns the numOfWorkers
goroutines parallelly,
which process the int
items on channel c
. Each of one these fact
goroutines creates a new cout
channel and puts the
result on it.
for s := 0; s < numOfWorkers; s++ {
// calculate the factorials
outChanArr[s] = fact(c, done, s)
}
If we say we that want 4 workers, 4 goroutines will be spawned, and the channel c
will be passed down. It is as if 4 gophers
are taking off the items on conveyor belt c
. This process is called fan out
. Once each of the goroutines process the items,
the output is put on cout
channel. These channels are appended to outChanArr
.
Then we collate the outChanArr
in collateFunc
and put it on collateChan
for displaying it on “standard out”. It is as if gophers
called collateFunc0, collateFunc1, collateFunc2
are gathering the output from conveyer belts outChanArr[0], outChanArr[1], outChanArr[2]...
and so on, and putting it on single conveyor belt outChanArr
. This process is called fan fan in
.
These gophers are signalling whether they are done with their tasks on done
channel.
We are checking whether all of the goroutines are finished with their tasks with anonymous goroutine, which blocks on done
channel. Once, all
of the goroutines are done we are closing collateChan
channel.
go func() {
for s := 0; s < numOfWorkers; s++ {
<-done // syncronise the goroutines
}
close(collateChan)
}()
The last line is just printing the output from channel collateChan
.
I ran the above code, and the output was,
pranav@ubuntu:~/goworkspace/src/github.com/pranav93/gopher/fan$ go run main.go
Enter the number of workers to spawn : 4
map[0:1]
map[3:6]
.
.
.
map[14:87178291200]
map[15:1307674368000]
map[18:6402373705728000]
Goroutine fact 0 processed 501 items
Goroutine fact 0 is finished
Goroutine consume 0 consumed 501 items
Goroutine fact 1 processed 497 items
Goroutine fact 1 is finished
Goroutine consume 1 consumed 497 items
Goroutine fact 2 processed 495 items
Goroutine fact 2 is finished
Goroutine consume 2 consumed 495 items
Goroutine fact 3 processed 507 items
Goroutine fact 3 is finished
Goroutine consume 3 consumed 507 items
map[19:121645100408832000]
The 2000 items are spread across 4 goroutines almost evenly.