From the beginning, I want to emphasize that I’m not a professional Golang developer. If you are a newbie in Golang (like me) you might be interested to get familiar with the concept of goroutines. I hope this post can help you. Take a look at some basic examples here https://gobyexample.com/goroutines.
When I first saw the concept of goroutines I immediately fell in love with that. This was the main reason why I started occasionally programming in Go. Being a Python fanboy it pains me to admit that Go’s implementation of concurrency is more comprehensible and clear than recently introduced features of Python 3.5.1 — async/await.
I started to play around goroutines and channels. My first thought was: why would it be difficult to program a simple worker which receives a group of tasks, runs them concurrently, and spits out the aggregated result? Especially having such beautiful features like goroutine and channel it might look very easy:
The idea is straightforward (as I mistakenly thought):
- The channel “Tasks” collects the tasks to execute.
- The worker awaits a task reading from the channel in the infinite loop.
- When the task is received, the worker starts asynchronous execution.
- Each goroutine writes the result of execution to the “resultChannel”.
- The client collects the results reading from the “resultChannel”.
The implementation of this could be very short. But after some considerations, I realized how much issue-prone this solution is.
The client awaits the result in the infinite loop over “resultChannel”. Eventually, you have to continue processing. It means closing this channel and leaving an infinite loop. But how do you know when to leave?
Counting tasks. But what if for some reason goroutines deliver no value.
Close the “resultChannel”? But when? What if you close the channel but other goroutines are still trying to write to it? it’s a panic btw.
Then I found this article in the “Go Blog”: https://blog.golang.org/pipelines. The article is great. Then I decided to implement a reusable solution that I could use later in other projects.
First of all, I have to send the result of task execution from goroutine to the client. The result can be some value or an error. If I want to reuse my worker, the result has to be some dynamic type. Golang might be not very dynamic, but it has an empty interface: interface{}. I declare the function type which will return the value of type interface{}:
type TaskFunction func() interface{}
All my tasks have to conform to this function type. The result of the function can be some primitive type (integer or string) or a slice, or even an error. It can be everything.
I define several tasks to be executed concurrently:
// Returns an integer after pause
slowFunction := func() interface{} {
time.Sleep(time.Second * 2)
fmt.Println("slow function")
return 2
}
// Returns a string after longer pause
verySlowFunction := func() interface{} {
time.Sleep(time.Second * 4)
fmt.Println("very slow function")
return "I'm ready"
}
// One function returns an error
errorFunction := func() interface{} {
time.Sleep(time.Second * 3)
fmt.Println("function with an error")
return errors.New("Error in function")
}
Notice that all tasks are the same signature which conforms to our function type.
One idea from the original “Go Blog” article is that each incoming task becomes a worker with an individual output channel for the result. The values from multiple output channels will be merged into one output channel. And the merged channel will be returned to the client:
Let’s implement the function which creates a new worker for incoming task function. The channel “done” will be used to orchestrate canceling of task execution across the goroutines (for example if one goroutine returns an error and you don’t want to wait for others). This channel can be any type because we don’t need to send something to it (it will be discussed later).
func newWorker(task TaskFunction, done chan struct{}) chan interface{} {
As I earlier said every task becomes the new channel for the result:
out := make(chan interface{})
Now we ready to implement the worker:
go func() {
defer close(out)
select {
// Received a signal to abandon further processing
case <-done:
return
// Got some result
case out <- task():
}
}()
Now we ready to implement the worker: The worker is an anonymous function (furthermore it is a goroutine). At first, we need to take care of our output channel. We close it via defer call. It means the output channel will be closed as soon as the worker is done.
Commonly “select” statement is used for non-blocking reading from channels when it is not necessary to wait for the result. It can be achieved using the “default” statement. But it can be used to read results from several channels. In our example firstly it reads from “done” channel. It is indicating that we have to leave the worker before the task is actually done. Secondly, we execute the task and send the result to the output channel which is accessible from the worker (goroutine). In the end, we return the output channel. The whole function would look like:
func newWorker(task TaskFunction, done chan struct{}) chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
select {
// Received a signal to abandon further processing
case <-done:
return
// Got some result
case out <- task():
}
}()
return out
}
As you can see there are no blocks in the code. The output channel will be immediately returned no matter how long would it take to complete a task. Moreover, the output channel will be eventually closed as soon as the task is completed.
Ok, we receive for each task a channel. What do we have to do with them? We need to merge all channels and return to the client only one merged channel.
func merge(workers []chan interface{}, done chan struct{}) chan interface{}
We send a slice with the channels from every single task and return one merged channel.
out := make(chan interface{})
Here we cannot simply close the output channel via deferred call. Earlier we did that because we were sure that nobody sends more than one value to the channel. Here we have to count the workers using Go’s sync:
var wg sync.WaitGroup
wg.Add(len(workers))
Let’s define the function which takes the result from the worker and pass it to the merged channel. The code is similar to our worker except there is no task execution: the result from one channel (worker channel) will be sent to the merged channel. After this, the counter will be decreased by deferred call wg.Done:
output := func(c <-chan interface{}) {
defer wg.Done()
for result := range c {
select {
// Received a signal to abandon further processing
case <-done:
return
// some message or nothing
case out <- result:
}
}
}
Then we execute asynchronously this function for each worker channel we do have in the slice:
for _, workerChannel := range workers {
go output(workerChannel)
}
The last thing we have to take care of is to close the merged channel and return it to the client:
go func() {
wg.Wait()
close(out)
}()
return out
We use here Golang’s WaitGroup. We wait before all workers have sent the result from task execution to the merged channel. Notice, this is a goroutine as well.
The whole function looks like:
func merge(workers []chan interface{}, done chan struct{}) chan interface{} {
// Merged channel with results
out := make(chan interface{})
// Synchronisation over channels: do not close "out" before
// all tasks are completed
var wg sync.WaitGroup
// Define function which waits the result from worker channel
// and sends this result to the merged channel.
// Then it decreases the counter of running tasks via wg.Done().
output := func(c <-chan interface{}) {
defer wg.Done()
for result := range c {
select {
// Received a signal to abandon furher processing
case <-done:
return
// some message or nothing
case out <- result:
}
}
}
wg.Add(len(workers))
for _, workerChannel := range workers {
go output(workerChannel)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Notice that there are no blocks in the function.
Here is an example of how this function could be used:
- All task have to be collected into the slice:
tasks := []worker.TaskFunction{task1, task2, task3}
- Initialize the channel which will orchestrate canceling of task execution:
done := make(chan struct{})
defer close(done)
- Start task execution:
resultChannel := worker.PerformTasks(tasks, done)
* Get results from completed tasks:
```Go
for result := range resultChannel {
switch result.(type) {
case string:
fmt.Println("Here is a string:", result.(string))
case int:
fmt.Println("Here is an integer:", result.(int))
We can block on a loop over the result channel. The result of task execution is interface{} therefore you might need to cast it to some specific type. It can be achieved using the switch statement.
The done channel is being closed via deferred call. That’s why we don’t need to send anything to it to say to all our goroutine to stop processing. It will trigger the case “case <-done” in all selects you have seen in the code. It will stop all running goroutines and close all output channels.
P.S. I still think that concurrency concept of Golang’s is great :) That’s it. Please find the full code on GitHub: https://github.com/nbys/asyncwork
>> Home