Good Code vs Bad Code in Golang

Recently, I was asked to detail what makes a good code or a bad code in Golang. I found this exercice very interesting. Actually, interesting enough to write a post about that. To illustrate my answer, I have taken a concrete use cases I faced in the Air Traffic Management (ATM) domain. The project is available in Github.

Context

First, few words to explain the context of the implementation.

Eurocontrol is the organization managing the air traffic across Europe countries. The common network for exchanging data between Eurocontrol and an Air Navigation Service Provider (ANSP) is called AFTN. This network is mainly used to exchange two different message types: ADEXP and ICAO messages. Each message type has its own syntax but in terms of semantic, both types are equivalent (more or less). Given the context, performance must be a key element for the implementation.

This project has to provide two implementations for parsing ADEXP messages (ICAO is not managed in the frame of this exercise) based on Go:

  • A bad implementation (package name: bad)
  • A refactored implementation (package name: good)

An example of an ADEXP message can be found here.

In the frame of this exercise, the parsers handle only a subset of the fields we can find in an ADEXP message. Yet, It is still relevant to illustrate common Go mistakes.

Parsing

In a nutshell, an ADEXP message is a set of tokens. A token type can be either:

  • Simple

-ARCID ACA878

Meaning the ARCID (aircraft identifier) is ACA878.

  • Repeating

-EETFIR EHAA 0853
-EETFIR EBBU 0908

This example is a list of FIR (Flight Information Region). The first FIR is EHAA 0853 whereas the second one is EBBU 0908.

  • Complex

-GEO -GEOID GEO01 -LATTD 490000N -LONGTD 0500000W
-GEO -GEOID GEO02 -LATTD 500000N -LONGTD 0400000W

A repeating list of tokens. Each line contains a sublist of tokens (in this example GEOID, LATTD, LONGTD).

Given the context, it is important to implement a version leveraging parallelization. So the algorithm is the following one:

  • A preprocessing step to clean and rearrange the input message (we have to clean the potential white spaces, rearrange the tokens which are multi-lined like COMMENT etc.)
  • Then splitting each line in a given goroutine. Each goroutine will be in charge to process one line and to return the result.
  • Last but not least, gathering the results and returning a Message structure. This structure is a common one regardless of the message type (ADEXP or ICAO).

Each package contains an adexp.go file exposing the main function ParseAdexpMessage().

Step-by-step comparison

Let’s now see step by step what I consider as a bad code and how I refactored it.

String vs []byte

The bad implementation handles only string inputs. As Go offers a strong support for bytes operations (basic operation like trim, regexp etc.) and that the input will most likely by a []byte (considering AFTN messages are received through TCP), there’s actually no good reason to force a string input.

Error management

The error management is kind of terrible the bad implementation.
We can find some cases where potential errors returned in the second argument are not even managed:

preprocessed, _ := preprocess(string)

The good implementation deals with each potential error:

preprocessed, err := preprocess(bytes)
if err != nil {
return Message{}, err
}

We can also find some mistakes in the bad implementation like in the following code:

if len(in) == 0 {
return "", fmt.Errorf("Input is empty")
}

The first mistake is a syntax one. An error string shall neither be capitalized nor end with a punctuation according to Go standards.
The second mistake is due to the fact that if an error string is a simple constant (no formatting is required), a call to errors.New() is slightly more performant.

The good implementation looks like:

if len(in) == 0 {
	return nil, errors.New("input is empty")
}

Avoid nesting

The mapLine() function is a good example of avoidable nesting calls. The bad implementation:

func mapLine(msg *Message, in string, ch chan string) {
    if !startWith(in, stringComment) {
        token, value := parseLine(in)
        if token != "" {
            f, contains := factory[string(token)]
            if !contains {
                ch <- "ok"
            } else {
                data := f(token, value)
                enrichMessage(msg, data)
                ch <- "ok"
            }
        } else {
            ch <- "ok"
            return
        }
    } else {
        ch <- "ok"
        return
    }
}

On the opposite, the good implementation is a flat representation:

func mapLine(in []byte, ch chan interface{}) {
    // Filter empty lines and comment lines
    if len(in) == 0 || startWith(in, bytesComment) {
        ch <- nil
        return
    }

    token, value := parseLine(in)
    if token == nil {
        ch <- nil
        log.Warnf("Token name is empty on line %v", string(in))
        return
    }

    sToken := string(token)
    if f, contains := factory[sToken]; contains {
        ch <- f(sToken, value)
        return
    }

    log.Warnf("Token %v is not managed by the parser", string(in))
    ch <- nil
}

This makes the code easier to read in my opinion. Furthermore, this flat representation must also be applied to errors management. As an example:

a, err := f1()
if err == nil {
    b, err := f2()
    if err == nil {
        return b, nil
    } else {
        return nil, err
    }
} else {
    return nil, err
}

Should be replaced by:

a, err := f1()
if err != nil {
    return nil, err
}
b, err := f2()
if err != nil {
    return nil, err
}
return b, nil

Once again, the second code version is easier to read.

Passing data by reference or by value

The signature of the preprocessing function in the bad implementation is:

func preprocess(in string) (string, error) {
}

Given the context of this project (performance does matter) and considering a message can potentially be quite heavy, a better option was to pass a string pointer instead. Otherwise, the in string will be copied during each call.

The good implementation does not face this problem as it deals with slices (a simple 24-byte structure regardless of the underlying data):

func preprocess(in []byte) ([][]byte, error) {
}

More generally speaking, passing data either by reference or by value must not be an idiomatic choice.
Passing data by value could also help to make sure a function will not cause any side effect (like mutating the data passed in the function input). This has several benefits like unit testing or refactoring a code for parallelization for example (otherwise we need to check each subfunction to see if a mutation is made).

I do believe such choice must really be done carefully depending on the project context.

Parallelization

The bad implementation is based on a good initial idea: leveraging goroutines to parallelize the data processing (one goroutine per line).

This is achieved in the bad implementation by iterating over the number of lines and spawning a mapLine() call in a goroutine.

for i := 0; i < len(lines); i++ {
    go mapLine(&msg, lines[i], ch)
}

The mapLine() function takes in arguments three parameters:

  • A pointer to the final Message structure to be returned. It means each mapLine() will enrich the same variable.
  • The current line
  • A channel used for sending a notification once the processing of the line is done

Sending a pointer to a shared Message variable breaks one of the main Go principles:

Don’t communicate by sharing memory, share memory by communicating.

There are two main drawbacks to passing this shared variable:

  • Drawback #1: Slices concurrent modifications

Because the structure contains some slices which can be modified concurrently (by two or more goroutine at the same time), in the bad implementation we had to deal with mutexes.

For example, the Message structure contains a Estdata []estdata.
Modifying the slice by appending another estdata must be done this way:

mutexEstdata.Lock()
for _, v := range value {
    fl := extractFlightLevel(v[subtokenFl])
    msg.Estdata = append(msg.Estdata, estdata{v[subtokenPtid], v[subtokenEto], fl})
}
mutexEstdata.Unlock()

Actually, except very specific use cases, having to use a mutex in a goroutine might be a code smell.

  • Drawback #2: False sharing

Sharing memory across threads/goroutines is not a good idea due to potential false sharing (a cache line in a given CPU core cache can be invalidated by another CPU core cache). This means we should avoid as much as possible sharing the same variable across threads/goroutines if they intend to mutate it.

In this very example, though, I don’t think false sharing has a huge impact as the input file is quite light (running a performance test with padding fields in the Message structure gives more or less the same result). Yet, that’s always something important to bear in mind in my opinion.

Let’s see now how is the good implementation dealing with the parallelization:

for _, line := range in {
    go mapLine(line, ch)
}

Now, the mapLine() takes only two inputs:

  • The current line
  • A channel. This time this channel is not used to simply send a notification once a line processing is done but also to send the actual result. It means it is not up to the goroutines to modify the final Message structure.

Gathering the results is done this way by the parent goroutine (the one spawning the mapLine() calls in separate goroutines):

msg := Message{}

for range in {
    data := <-ch

    switch data.(type) {
        // Modify msg variable
    }
}

This implementation is more aligned, in my opinion, with Go principles to share memory only by communicating. The Message variable is modified by a single goroutine to prevent potential concurrent slices modifications and false sharing.

One potential criticism even with the good code is to spawn a goroutine for each line. Such implementation will work because an ADEXP message will not contain thousands of lines. Yet, the simple implementation one request triggering one goroutine does not scale very much under very high throughput. A better option would have been to create a pool of reusable goroutines for example.

But again, in the context of an ADEXP message parsing, one line triggering one goroutine might be good enough.

Line processing notification

In the bad implementation, as described above, once a line processing is achieved by a mapLine() we should indicate it to the parent goroutine. This is done using a chan string channel and a call using:

ch <- "ok"

As the parent does not actually check the value sent by the channel, a better option would have been to use chan struct{} with a ch <- struct{}{} or even better (GC wise) to use a chan interface{} with a ch <- nil.

Another approach (even cleaner in my opinion) would have been to use a sync.WaitGroup as the parent goroutine just need to continue its execution once every mapLine() is done.

If

The Go if statement allows passing a statement before the condition.

An improved version of:

f, contains := factory[string(token)]
if contains {
    // Do something
}

Can be the following implementation:

if f, contains := factory[sToken]; contains {
    // Do something
}

It slightly improves the code readability.

Switch

Another mistake with the bad implementation is to forget the default case in the following switch:

switch simpleToken.token {
case tokenTitle:
    msg.Title = value
case tokenAdep:
    msg.Adep = value
case tokenAltnz:
    msg.Alternate = value 
// Other cases
}

The default can be optional if the developer thought about all the different cases. Yet, it is definitely better to catch this specific case like in the following example:

switch simpleToken.token {
case tokenTitle:
    msg.Title = value
case tokenAdep:
    msg.Adep = value
case tokenAltnz:
    msg.Alternate = value
// Other cases    
default:
    log.Errorf("unexpected token type %v", simpleToken.token)
    return Message{}, fmt.Errorf("unexpected token type %v", simpleToken.token)
}

Handling the default case would help in catching potential bugs made by developers as soon as possible in the development process.

Recursion

The parseComplexLines() is a function to parse a complex token. The algorithm in the bad code is done using recursion:

func parseComplexLines(in string, currentMap map[string]string, 
	out []map[string]string) []map[string]string {

    match := regexpSubfield.Find([]byte(in))

    if match == nil {
        out = append(out, currentMap)
        return out
    }

    sub := string(match)

    h, l := parseLine(sub)

    _, contains := currentMap[string(h)]

    if contains {
        out = append(out, currentMap)
        currentMap = make(map[string]string)
    }

    currentMap[string(h)] = string(strings.Trim(l, stringEmpty))

    return parseComplexLines(in[len(sub):], currentMap, out)
}

Yet, Go does not support tail-call elimination to optimize sub-function calls. The good code produces the exact same result but using an iterative algorithm:

func parseComplexToken(token string, value []byte) interface{} {
    if value == nil {
        log.Warnf("Empty value")
        return complexToken{token, nil}
    }

    var v []map[string]string
    currentMap := make(map[string]string)

    matches := regexpSubfield.FindAll(value, -1)

    for _, sub := range matches {
        h, l := parseLine(sub)

        if _, contains := currentMap[string(h)]; contains {
            v = append(v, currentMap)
            currentMap = make(map[string]string)
        }

        currentMap[string(h)] = string(bytes.Trim(l, stringEmpty))
    }
    v = append(v, currentMap)

    return complexToken{token, v}
}

The second code will be then more performant than the first one.

Constants management

We must manage a constant value to dissociate ADEXP and ICAO messages. The bad code is doing it this way:

const (
    AdexpType = 0 // TODO constant
    IcaoType  = 1
)

Whereas the good code is a more elegant solution based on Go (elegant) iota:

const (
    AdexpType = iota
    IcaoType 
)

It produces exactly the same result but it reduces potential developer mistakes.

Receiver functions

Each parser provides a function to determine whether a message concerns the upper level (at least one route point above the level 350).

The bad code implements it this way:

func IsUpperLevel(m Message) bool {
    for _, r := range m.RoutePoints {
        if r.FlightLevel > upperLevel {
            return true
        }
    }

    return false
}

Meaning we have to pass a Message as an input of the function.
Whereas the good code is simply a function with a Message receiver:

func (m *Message) IsUpperLevel() bool {
    for _, r := range m.RoutePoints {
        if r.FlightLevel > upperLevel {
            return true
        }
    }

    return false
}

The second approach is preferable. We simply indicate the Message struct implements a specific behavior.

It might also be a first step to using Go interfaces. For example, if someday we need to create another structure with the same behavior (IsUpperLevel()), the initial code does not even need to be refactored (as Message already implements this behavior).

Comments

This one is pretty obvious but the bad comment is poorly commented.

On the other side, I tried to comment the good code as I would do in a real project. Even though I’m not the kind of developer who likes to comment every single line, I still believe it is important to comment at least each function and the main steps in a complex function.

As an example:

// Split each line in a goroutine
for _, line := range in {
    go mapLine(line, ch)
}

msg := Message{}

// Gather the goroutine results
for range in {
    // ...
}

One concrete example in addition of a function comment might also be very useful:

// Parse a line by returning the header (token name) and the value. 
// Example: -COMMENT TEST must returns COMMENT and TEST (in byte slices)
func parseLine(in []byte) ([]byte, []byte) {
    // ...
}

Such concrete examples can really help another developer in better understanding an existing project.

Last but not least, according to Go best practices the package itself is also commented.

/*
Package good is a library for parsing the ADEXP messages.
An intermediate format Message is built by the parser.
*/

package good

Logging

Another obvious example is the lack of logs produced in the bad code. As I’m not a fan of the standard log package, I used an external library called logrus in this project.

go fmt

Go provides a set of powerful tools like go fmt. Unfortunately, we forgot to apply it to the bad code whereas it was done on the good code.

DDD

DDD brings the concept of ubiquitous language to emphasize the importance of a shared language between all the project stakeholders (business experts, dev, testers etc.).
This cannot be really measured here in this example, but keeping a simple structure like Message compliant with the language spoken inside of a bounded context is also a good point for the overall project maintainability.

Performance results

On an i7-7700 4x 3.60Ghz, I ran a benchmark test to compare both parsers:

  • Bad implementation: 60430 ns/op
  • Good implementation: 45996 ns/op

The bad code is more than 30% slower than the good one.

Conclusion

It is pretty difficult in my opinion to give a general definition of what is a bad and a good code. A code in one context might be considered as good whereas in another context it might be considered as bad.

The first obvious characteristic of a good code is to provide a correct solution according to given functional requirements. A code can be performant if it does not fit the requirements, it is pretty useless.

Meanwhile, it is important for a developer to care about simple, maintainable and performant code.

The performance improvement does not materialize from the air, it comes with code complexity increase.

A good developer is someone able to find the right balance between these characteristics according to a given context.

Just like in DDD, context is key 🙂

An Introduction To gosiris, An Actor Framework For Go

This post is an introduction to an actor framework for Golang: gosiris. First of all, I will introduce the context, then we will dig into the framework and its capabilities.

Go

Go (or golang) is a programming language created by Google 8 years ago. It is compiled, statically typed, not object-oriented (even though it provides interfaces to define a set of methods) and has a garbage collector.

Initially, I started to learn Go because of its capabilities in terms of concurrent programming.

I think Node.js is not the best system to build a massive server web, I would use Go for that – Ryan Dahl, creator of Node.js

Actually, Go is based on the Communicating Sequential Processes principle (CSP). In a nutshell, CSP is a concurrency model to avoid sharing memory across processes. Instead, the idea is to implement sequential processes and having communication channels between these processes.

This principle is summarized by Rob Pike, co-creator of Go:

Don’t communicate by sharing memory; share memory by communicating – Rob Pike

The actor model

The core idea of the actor model is the same than CSP, namely to shared memory between processes and favor message passing instead. Yet there are two main differences.

The first one is that CSP is purely synchronous meaning a channel writer is blocked until a channel reader reads the message. This limitation was tackled in Golang, though, with the introduction of non-blocking channels (the so-called buffered channels).

The second main difference is that in the CSP model, the processes are somehow anonymous. A channel writer has only a reference to a channel without actually knowing who will receive the message. The actor model, though, is a point-to-point integration. An actor has a reference to another actor (through its identifier for example) to communicate with it.

Most of the actor frameworks implement also a hierarchy concept between the different actors. This simple idea is actually really powerful. A parent actor, for example, could be directly notified of a child actor failure and then decide about a failure strategy (Restart the child? Fail itself to implement a kind of circuit breaker? Etc.).

gosiris

gosiris is an actor framework for Go allowing local or remote communications between actors and providing runtime discoverability plus distributed tracing.

The remote communications can be done using either an AMQP broker or Kafka. The runtime discoverability is achieved using an etcd registry and the distributed tracing is based on Zipkin.

Hello world example

We will see in the following example how to implement a local send-only interaction.

The first step is to create an actor system:

gosiris.InitActorSystem(gosiris.SystemOptions{
    ActorSystemName: "ActorSystem",
})
defer gosiris.CloseActorSystem()

We will see later on an actor system can be distributed by simply configuring some options.

Then it’s time to create and register an actor:

parentActor := gosiris.Actor{}
defer parentActor.Close()

gosiris.ActorSystem().RegisterActor("parentActor", &parentActor, nil)

Each actor has a logical name, here parentActor.

Then we will create and register a child actor but we will implement a specific behavior on a given message type (message):

childActor := gosiris.Actor{}
defer childActor.Close()

childActor.React("message", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received %v\n", context.Data)
})

gosiris.ActorSystem().SpawnActor(&parentActor, "childActor", &childActor, nil)

An actor can be composed of multiple reactions. Each reaction is based on an event type and must define a specific behavior by implementing a simple function with a context parameter.

Finally, to send a message from the parent to the child:

parentActorRef, _ := gosiris.ActorSystem().ActorOf("parentActor")
childActorRef, _ := gosiris.ActorSystem().ActorOf("childActor")

childActorRef.Tell(gosiris.EmptyContext, "message", "Hi! How are you?", parentActorRef)

As you can see, we have not used the parentActor or childActor variable to send a message. This is not possible in gosiris. Instead we have done a lookup first in the actor system using the actor identifiers (ActorOf(string)). Each lookup returns an ActorRef structure which is simply a reference to an actor.

As a summary:

package main

import (
    "gosiris/gosiris"
)

func main() {
    gosiris.InitActorSystem(gosiris.SystemOptions{
        ActorSystemName: "ActorSystem",
    })

    parentActor := gosiris.Actor{}
    defer parentActor.Close()

    childActor := gosiris.Actor{}
    defer childActor.Close()
    childActor.React("message", func(context gosiris.Context) {
        context.Self.LogInfo(context, "Received %v\n", context.Data)
    })

    gosiris.ActorSystem().RegisterActor("parentActor", &parentActor, nil)
    gosiris.ActorSystem().SpawnActor(&parentActor, "childActor", &childActor, nil)

    parentActorRef, _ := gosiris.ActorSystem().ActorOf("parentActor")
    childActorRef, _ := gosiris.ActorSystem().ActorOf("childActor")

    childActorRef.Tell(gosiris.EmptyContext, "message", "Hi! How are you?", parentActorRef)
}

Stateful actor

It is also possible to manage a stateful actor like in the following example:

type StatefulActor struct {
    gosiris.Actor
    someState interface{}
}

statefulActor := new(StatefulActor).React("someEvent", func(context gosiris.Context) {
    //Some behavior
})

Actor supervision

As we’ve seen, one on the benefits of the actor model is to implement a hierarchy of the actors. In the following example, a parent actor is automatically notified of a child failure:

actor.React(gosiris.GosirisMsgChildClosed, func(context gosiris.Context) {
    context.Self.LogInfo(context, "My child is closed")
})

The parent actor can then decide on the strategy to adopt and decide for example to stop itself:

context.Self.AskForClose(context.Self)

Become/unbecome

In gosiris, it is also possible to implement the become/unbecome pattern familiar in Akka for example:

angry := func(context gosiris.Context) {
    if context.Data == "happy" {
        context.Self.LogInfo(context, "Unbecome\n")
        context.Self.Unbecome(context.MessageType)
    } else {
        context.Self.LogInfo(context, "Angrily receiving %v\n", context.Data)
    }
}

happy := func(context gosiris.Context) {
    if context.Data == "angry" {
        context.Self.LogInfo(context, "I shall become angry\n")
        context.Self.Become(context.MessageType, angry)
    } else {
        context.Self.LogInfo(context, "Happily receiving %v\n", context.Data)
    }
}

actor := gosiris.Actor{}
defer actor.Close()
actor.React("context", happy)

At first, the actor is configured to react by implementing the happy behavior. Then depending on the context, it may change at runtime its behavior to become angry and to unbecome happy.

Distributed example

Let’s see now how to create a distributed actor system:

gosiris.InitActorSystem(gosiris.SystemOptions{
    ActorSystemName: "ActorSystem",
    RegistryUrl:     "http://etcd:2379",
    ZipkinOptions: gosiris.ZipkinOptions{
        Url:      "http://zipkin:9411/api/v1/spans",
        Debug:    true,
        HostPort: "0.0.0.0",
        SameSpan: true,
    },
})
defer gosiris.CloseActorSystem()

Here we referenced an etcd server for the runtime discoverability and Zipkin server for distributed tracing.

In addition, we will implement a request/reply interaction with one actor deployed on AMQP while the other will be deployed on Kafka:

actor1 := new(gosiris.Actor).React("reply", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received: %v", context.Data)

})
defer actor1.Close()
gosiris.ActorSystem().RegisterActor("actor1", actor1, new(gosiris.ActorOptions)
	.SetRemote(true)
	.SetRemoteType(gosiris.Amqp)
	.SetUrl("amqp://guest:guest@amqp:5672/")
	.SetDestination("actor1"))

actor2 := new(gosiris.Actor).React("context", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received: %v", context.Data)
    context.Sender.Tell(context, "reply", "hello back", context.Self)
})
defer actor2.Close()
gosiris.ActorSystem().SpawnActor(actor1, "actor2", actor2, new(gosiris.ActorOptions)
	.SetRemote(true)
	.SetRemoteType(gosiris.Kafka)
	.SetUrl("kafka:9092")
	.SetDestination("actor2"))

We simply changed the call to RegisterActor() by adding additional actor options. Once an actor is defined as remote we must at least configure the transport type (AMQP or Kafka), an URL and a destination. Every time a new actor is registered, each actor system will receive a notification and modify its internal actor map.

Last but not least because the actor system has been initialized using a Zipkin reference, gosiris will also manage the distributed tracing part. It means each time an interaction is started (a tell with an empty context), a new parent span will be initialized. Then each reaction becomes automatically a child span and all logs are also forwarded to the Zipkin collector.

Conclusion

As you have seen in the last example, in less than 30 effective lines of code, we have implemented using gosiris a distributed request/reply interaction between two remote actors automatically discovered at runtime and using Zipkin for achieving distributed tracing.

If you are interested in gosiris, feel free to contact me @teivah.

Further reading

Event Sourcing And Concurrent Updates

When we have to deal with concurrent updates, we generally think about two possibilities:

  • Pessimistic locking: an object is explicitly locked by a system
  • Optimistic concurrency control: the system determines whether another client has changed the object before to update it. There are several possible implementations, like using a CAS or providing an expected version of the object.

The bigger the chances to get a concurrent update are, the better it is to favor a pessimistic locking approach. Indeed, if an optimistic transaction fails, the cost to retry it may be very expensive (back and forth to the client, delay, retry etc.).

In addition, an Event Sourcing strategy can be a good solution to deal with highly concurrent environments.

A common strategy with Event Sourcing is to implement something on top of an optimistic concurrency control. For example, a client will request a change on a given object with providing an expected version. If the request fails because the current version is greater than the expected one, instead of naively throwing an error, the system can:

  • First, retrieve the event(s) persisted since the expected version
  • Then check whether these event(s) are really conflicting from a business perspective

Let’s take the following example:

In black, the events already persisted (CustomerCreated and FamilySituationUpdated). At this moment, the current version of the customer is v2.

Let’s now imagine the system receives two concurrent updates with the same expected version (v3). The first event ContactInfoUpdated will be persisted because of the version matches but what about the NewContractSubscribed one?

As we described above, the system can retrieve all the events since v3 (here only ContactInfoUpdated) and determine whether they are actually conflicting or not. In this example, we can imagine that adding a new contract is not a conflict with the update of a contact information so the system will commit the transaction.

To determine whether an event is in conflict with a set of already persisted events, there are several strategies.

We can, for example, implement a blacklist strategy. Any event type registers as well a set of conflicting events. If any of these events are found, the transaction is not committed.
Furthermore, we can also decide to do it with a custom conflict checker function. Bear in mind, though, that one of the benefits of this strategy is to be faster than a simple optimistic concurrency one. If the duration to execute our function is too important (and the chances to get concurrent updates are high), it will impact the system throughput.

Another possible strategy to deal with concurrent updates could be to vary the data model granularity. For example, instead of having a simple event stream for a customer, we could split up its representation in several streams (e.g. GeneralInformation and ContractInformation). Then we will have to apply an optimistic concurrency control for each stream (GeneralInformation has its own version and ContractInformation has also its own version).

Yet, this solution may have some limitations. Indeed we must guarantee the conflict resolution is stream-autonomous, meaning the events stored in one stream are enough to guarantee the integrity of our whole object.

On the opposite, though, a coarse-grained representation (e.g. one stream for a single customer), may lead to trigger the conflict resolution mechanism too often (and once again to impact the system throughput). Therefore the granularity decision remains a very important one, regardless of our strategy.

Running Kafka 1.0 In Docker

With the recent release of Kafka 1.0.0, I have just released the Docker image teivah/kafka (forked from spotify/kafka).

You simply need to run the following command by replacing <DOCKER_HOST> with your Docker host IP:

docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=<DOCKER_HOST> --env ADVERTISED_PORT=9092 teivah/kafka

The Docker image will instantiate a Zookeeper server (port 2181) and a Kafka 1.0.0 (port 9092).

To create a Kafka producer you must run the following command in your Docker instance:

/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list <DOCKER_HOST>:9092 --topic test

And for a Kafka consumer:

/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --bootstrap-server <DOCKER_HOST>:9092 --topic test

An Introduction To Reactive Streams

As you may know, Java 9 is about to be released on September 21st. Among many new features, the Reactive Streams integration was probably the one I was expecting the most. The goal of this very post is to introduce the original Reactive Streams initiative. As a second step in another post, I will describe what Java 9 will bring.

The original initiative

Reactive Streams is an initiative started in 2013 with different companies like Netflix, Pivotal, Lightbend etc. The core goal is to provide a standard for asynchronous stream processing with a non-blocking back pressure.

To explain what the concept of back pressure is, let us take a concrete example. Your organization is composed of legacy applications. Some of them are, most likely, running on old technologies with limited scalability & performance capabilities. Then you put in place a new revolutionary system, capable to handle thousands of messages per second. If this new system communicates synchronously with some of your legacies (without any throttling mechanisms provided by a service gateway in between), those legacies could simply just crash at some point due to the number of incoming requests sent by the new system. This scenario might seem a bit far-fetched but actually, I did experience it.

To tackle this problem, the core idea of back pressure is to not force a subscriber system to buffer arbitrary amounts of data sent by a publisher.

As mentioned the back pressure problem could also be tackled with using a service gateway in between but also using messaging middleware. For example, if you have a JMS middleware in between a publisher and a subscriber, the requests buffering is not done by the subscriber itself. Yet from my perspective, the main benefit of Reactive Streams is the capability to achieve back pressure without being forced to use any middleware.

Basically, a subscriber can indicate directly to a publisher how many messages it will be able to manage:

  • During the initial subscription
  • And each time a new message is received (received does not necessarily mean processed)

We could also argue that the buffering problem is simply shifted somewhere else, on the publisher side. As an example, if the publisher is a GUI application which shall manage incoming user requests then it is up to this very application to handle the buffering. From a design point of view, it does make sense to buffer them on the publisher and not on the subscriber. Furthermore, in case of a synchronous interaction, some frameworks are releasing the initial request object from the publisher JVM (meaning available for garbage collection) once the reply has been sent by the subscriber. So in this case, the buffering is even done twice.

The Reactive Streams API

The Reactive Streams working group released an API for several languages (Java and .NET, the JavaScript API has not been released yet). This API is simply composed of four different interfaces (that belongs to the org.reactivestreams package for Java):

Subscription

A simple interface acting as a kind of channel description and providing two methods:

  • cancel(): Used by a subscriber to cancel its subscription
  • request(long): Used by a subscriber to request n messages to a Publisher

Publisher

A generic interface providing a single subscribe(Subscriber) method. At first glance considering the original publish/subscribe pattern, it might be strange for a publisher to subscribe to a subscriber. But this is actually the strength of Reactive Streams, the capability for a publisher to adapt its flow of data according to a subscriber.

Subscriber

A generic interface providing 4 methods:

  • onComplete(): Notify the subscriber when a publisher is closed
  • onError(Throwable): Notify the subscriber when a publisher is closed with an error state
  • onNext(T): Notify the subscriber a message was sent. If everything goes right, after having processed a message, the subscriber will usually invoke then Subscription.request(long).
  • onSubscribe(Subscription): Notify the subscriber that a publisher just started a subscription (through the Publisher.subscribe(Subscriber) method). This is also a way for the subscriber to keep a reference on the Subscription object provided.

Processor

A simple interface extending Publisher and Subscriber interfaces. In other words a Processor can acts as both a Publisher and a Subscriber.

API summary

Bear in mind that a data stream has not necessarily an end. So the sequence of actions is usually the following:

onSubscribe onNext* (onError | onComplete)?

So far several actors have released a Reactive Streams compliant implementation like Akka, Vert.x, Reactor, RxJava etc. Nonetheless, Java 9 itself is not compatible with the original API as the four interfaces are redefined in the java.util.concurrent package. I will describe this point in more details in an upcoming post.

Reactive Streams IO

You may have noticed that so far I was solely speaking about an API, not a protocol (just like JMS vs AMQP for instance). And obviously an API does not bring interoperability cross languages.

To tackle this, another working group is (was?) in charge to define a protocol on top of network protocols (either uni or bi-directional like TCP, WebSockets, HTTP/2 etc.). Unfortunately it seems there is no much activity on the Github project even though the first RC was initially planned in 2015. Yet we can easily imagine this should not be an easy job at all but I am looking forward to getting some news.

When to use Reactive Streams?

From my perspective, I can imagine two applications of Reactive Streams.

First, you have to protect a legacy exposing a synchronous API (as described in the example above) from the rest of the world. You can implement a kind of Back Pressure Layer (BPL) acting as a facade on top of the legacy. This is obviously not the best possible application as the request buffering is still not kept at the publisher level. Yet this could be a smarter implementation compared to the dumb throttling policy one you can implement with a service gateway.

Second, you want to implement a system based on reactive principles (responsiveness, resiliency, elasticity and message-driven). In that case, Reactive Streams might appear as a de facto standard for component interactions without being forced to use a load-centric service or messaging middleware.

Conclusion

Reactive Streams does appear as a compelling solution to handle back pressure. Furthermore, the fact that Java 9 integrated this concept is definitely a step ahead for the original Reactive Streams initiative.

In my next post, I am going to describe what will be brought by Java 9 in more details.

Further reading