Vert.x: Understanding Core Concepts

Today a deep dive into a framework for building reactive applications on the JVM: Vert.x
In this post we are going to cover the main concepts of Vert.x and provide examples for the developer wanting to see some concrete implementations.

The source code of each example is available at https://github.com/teivah/vertx-tutorial

Overview

vertx
Vert.x is a project started in 2011 by Tim Fox. It is still an official Eclipse Foundation project but led by Red Hat which sees Vert.x as a strategic project.

Vert.x is a framework based on the JVM allowing developers to implement autonomous components. These components communicate together in a weakly-coupled way by sending messaging over an Event Bus.

Vert.x offers a polyglot approach because developers are free to implement their components among a list of languages: Java, JavaScript, Groovy, Ruby, and Ceylon.

Likewise, Vert.x offers functionalities for implementing reactive applications based on a non-blocking and callback approach.

Vert.x is often compared as the Node.js for the JVM but we will see the main differences between both frameworks.

Reactor pattern and event loop

As implemented and popularized in Node.js, Vert.x is based upon the Reactor pattern.

The Reactor pattern is a solution:

  • To handle concurrent events (either a connection, a request or message).
  • Based upon a synchronous and blocking demultiplexer (also known as event loop) listening for incoming events.
  • Dispatching events to appropriate handlers if an operation can be executed in an asynchronous fashion.

A handler is generally implemented to manage operations such as I/O, database connection, HTTP request etc…

This pattern is often used to offer a solution to the C10k problem. This problem might be described very simply: how can a server handle ten thousand connections at the same time?

In such conditions, the classical approach by creating one dedicated thread per request is no longer possible due to physical and/or OS limitations (way too much thread context switching for instance).
In comparison, the Reactor pattern offers a model with a single-threaded event loop.

Yes. One thread to manage 10k connections. If you are not familiar with such concepts, this is pretty disruptive, isn’t it?
One thread to manage incoming events and dispatch them to asynchronous handlers.

The Vert.x approach is slightly different though. Let’s imagine now you want to scale-up your machine by adding more CPU cores. If you keep one single thread, you will not benefit from these new resources.

With Node.js though you can bypass this limitation usually by implementing one process per core.

In comparison, Vert.x allows you to configure and define the number of instantiated event loops. This is clearly a lovely and powerful option. A best practice to boost as most as possible the performance of your application is to set up as many instances as the number of CPU cores available.

It is worth saying that as a developer, you have to be very careful with the way you manage the event loop.
With the classical approach, if one thread is blocked; it is of course not a good thing but it will not impact many other threads.
With the reactor approach, a blocked thread would simply result in a disaster because you would impact all ongoing and upcoming events.

In addition, Vert.x offers a various set of non-blocking API. You can use Vert.x for non-blocking web operations (based upon Netty), filesystem operations, data accesses (for instance JDBC, MongoDB and PostgreSQL) etc…
There is also a standard way we will describe hereunder to manage a blocking operation directly from an event loop.

Verticle model

As mentioned in the overview, Vert.x proposes a model based upon autonomous and weakly-coupled components.
This type of component is called in Vert.x a Verticle and can be written in the different languages we have seen. Each verticle has its own classloader.

It is not mandatory to develop Vert.x applications based upon verticles but it is strongly recommended though to adopt this model.

A verticle is basically an object implementing two methods: start() and stop().
There are three different types of verticles:

  • Standard: Verticles based upon the event loop model. As mentioned, Vert.x allows to configure the number of event loop instances.
  • Worker: Verticles based upon the classical model. A pool of thread is defined, and each incoming request will consume one available thread.
  • Multi-threaded worker: An extension of the basic worker model. A multi-threaded worker is managed by one single instance but can be executed concurrently by several threads.

In the below examples, we will show concrete examples on how to configure and instantiate each verticle type.

The Verticle approach has some similarities with the Actor model but is not a full implementation of it.
I will not dig too much deeper into these concepts (it would be a nice idea for a future post though to present in details the differences between Vert.x and an Actor-compliant framework such as Akka). If we just stick to the three main principles of an Actor given in the Reactive Messaging Patterns with the Actor Model book:

  • Send a finite number of messages to other actors: we will see it in the next part, verticles are communicating together in an asynchronous fashion using messages.
  • Create a finite number of new actors: in my understanding/perception, this is not the philosophy of Vert.x, each verticle is deployed by a central process. Having concrete dependencies between verticles to form a kind of supervision system where a parent would have to supervise its child is not the model proposed by Vert.x.
  • Designate the behavior to be used for the next message it receives: Still according to my understanding, verticles are designed for being stateless components. They do not maintain their state to modify for instance their behaviors in preparation for future messages. It would still be possible to implement it manually but there is no Vert.x standard for doing it.

Event Bus

The last core concept is the Event Bus. As you can imagine, the Event Bus is the support for exchanging messages between the different verticles in a weakly-coupled fashion.

Each message is sent to a logical address. For example: customer.create

The event bus supports the two standard pattern types: point-to-point and publish / subscribe. With point-to-point pattern, you can also manage to implement request-reply if the recipient must reply to the initial message.

One important thing to understand, the event bus implements only best-effort delivery.
It means there is no way to manage guaranteed delivery by storing for instance messages on the disk. So there are true possibilities to lose messages.
Likewise, there is no capability to implement durable subscribers. It means that if a consumer is not connected to the Event Bus when a message is published, it will never be able to retrieve it.

Within your Vert.x application, the Event Bus may be the standard for synchronous and asynchronous exchanges without having to guarantee the delivery.
Otherwise, you should use a third-party messaging middleware.

If your Vert.x application runs on a single JVM node, there is no physical channel. It remains simply a messaging endpoint on-heap.
If your application is clustered, an event bus endpoint is available through a TCP channel. But this logic remains invisible for the developer. There is no difference in the way you are going to use the send/publish API. Vert.x manages that for you.

Last precision, so far with existing versions (up to 3.2.1), the Event Bus TCP communication is not encrypted. Based on the roadmap, it will be implemented from version 3.3 (~ June 2016).

Let’s code!

Enough theory for this post. Let’s see now some concrete examples:

  • In the first and second examples we will the basic concepts to implement a REST API.
  • In the third one we will describe how it is possible to compose asynchronous methods.
  • In the fourth example we will see how to publish, send and receive messages on the Event Bus.
  • The last example will show how to create and deploy verticles.

Create a basic REST service

Github example

In this example, we will create a simple method on a REST resource.

public class Sample extends AbstractVerticle {

	// Start method
	@Override
	public void start(Future<Void> startFuture) {
		// Create a router object allowing to route HTTP request
		final Router router = Router.router(vertx);

		// Create a new get method listening on /hello resource with a parameter
		router.route(HttpMethod.GET, "/hello/:name").handler(routingContext -> {
			// Retrieving request and response objects
			HttpServerRequest request = routingContext.request();
			HttpServerResponse response = routingContext.response();

			// Get the name parameter
			String name = request.getParam("name");

			// Manage output response
			response.putHeader("Content-Type", "text/plain");
			response.setChunked(true);
			response.write("Hello " + name);
			response.setStatusCode(200);
			response.end();
		});

		// Create an HTTP server listening on port 8080
		vertx.createHttpServer().requestHandler(router::accept).listen(8080, result -> {
			if (result.succeeded()) {
				// If the HTTP server is deployed, we consider the Verticle as
				// correctly deployed
				startFuture.complete();
			} else {
				// If not, we invoke the fail method
				startFuture.fail(result.cause());
			}
		});
	}

	// Stop method
	@Override
	public void stop(Future<Void> stopFuture) throws Exception {
		// Do something
		stopFuture.complete();
	}
}

We will see in the last examples how to deploy a verticle. So far, you have simply to note that if you need to create your own verticle, you must extend the AbstractVerticle class.

In this class, to manage the deployment and un-deployment parts, you can override these four methods:

  • start()
  • start(Future)
  • stop()
  • stop(Future<Void)

The start methods are called by Vert.x when an instance is deployed and the stop when an instance is un-deployed.
With the methods taking a Future<Void> parameter, you have the capability to indicate asynchronously that the instance has been successfully deployed.

final Router router = Router.router(vertx);

The router object is retrieved from the vertx one. This object is a protected one, inherited from AbstractVerticle.

router.route(HttpMethod.GET, "/hello/:name").handler(routingContext -> {
	// Retrieving request and response objects
}

We use route to expose a new GET method on a /hello resource with a name parameter.

If this verticle is a standard one, the implementation part will be invoked based on the event loop mechanism. It means the implementation must never ever be blocked. As we mentioned, each time that an operation can be managed asynchronously we need to dispatch it to a dedicated handler.
The routingContext object is a lambda containing the request context.

HttpServerRequest request = routingContext.request();
HttpServerResponse response = routingContext.response();

From the routingContext, we retrieve the incoming request and the response we are going to format.

String name = request.getParam("name");

We create a new String based on the input name parameter.

response.putHeader("Content-Type", "text/plain");
response.setChunked(true);
response.write("Hello " + name);
response.setStatusCode(200);

The response is then formatted (content, content-type etc…).

response.end();

At the end, when the response is formatted, we invoked the end() method to indicate the HttpServerResponse to send back a reply.

vertx.createHttpServer().requestHandler(router::accept).listen(8080, result -> {
	if (result.succeeded()) {
		startFuture.complete();
	} else {
		startFuture.fail(result.cause());
	}
});

After having configured our first route, we now want to create a listening HTTP port on 8080.
Based upon this configuration, we can then indicate to the startFuture future whether the verticle has been correctly started.
If there is no particular problem (port not already in used for instance), we call the complete() method. Otherwise we invoke fail(Throwable).

And that’s it! You have just created your first REST service.

Manipulate Json data and execute blocking code

Github example

In this example, we will expose a POST method but we will see as well how to manipulate JSON using Vert.x API and how to manage blocking codes from a verticle.

public void start(Future<Void> startFuture) {
	final Router router = Router.router(vertx);

	// Create a BodyHandler to have the capability to retrieve the body
	router.route().handler(BodyHandler.create());

	router.route(HttpMethod.POST, "/file").handler(routingContext -> {
		HttpServerResponse response = routingContext.response();

		// Retrieve the body as a JsonObject
		JsonObject body = routingContext.getBodyAsJson();
		// Get the filename attribute value
		String filename = body.getString("filename");

		// Execute a blocking part
		vertx.executeBlocking(future -> {
			byte[] bytes = parseLargeFile(filename);
			// Complete the future with the generated objects in the
			// blocking method
			future.complete(bytes);
		} , res -> {
			if (res.succeeded()) {
				// If the blocking part succeedeed
				byte[] bytes = (byte[]) res.result();

				response.putHeader("Content-Type", "application/octet-stream");
				response.setChunked(true);
				response.write(Buffer.buffer(bytes));
				response.setStatusCode(200);
				response.end();
			} else {
				// Otherwise we manage to return an error
				response.setStatusCode(500);
				response.setStatusMessage("Internal error: " + res.cause().getMessage());
				response.end();
			}
		});
	});

	vertx.createHttpServer().requestHandler(router::accept).listen(8080, result -> {
		if (result.succeeded()) {
			startFuture.complete();
		} else {
			startFuture.fail(result.cause());
		}
	});
}

The first part is similar except that here we are exposing a POST method on a /file resource.

JsonObject body = routingContext.getBodyAsJson();
String filename = body.getString("filename");

The Vert.x core API provides mechanisms to manipulate JSON data.
In this case we simple asked the routingContext to format the body as a JsonObject.
We create then a filename String using the getString(String) method.

vertx.executeBlocking(future -> {
	//Blocking part
	future.complete();
} , res -> {
	if (res.succeeded()) {
		//Success
	} else {
		//Failure
	}
});

Same alert than before, because we are implementing an event loop, we must take care to never block it.
Nevertheless let’s imagine now that you must call a blocking method, for instance, to read a file (even if for I/O operations, we will see in the next example that Vert.x provides a non-blocking API for that) or anything else but still based a blocking API.

Vert.x allows doing it from your event loop implementation by simply encapsulating the blocking part within an asynchronous handler.

vertx.executeBlocking(future -> {
	byte[] bytes = parseLargeFile(filename);
	// Complete the future with the generated objects in the
	// blocking method
	future.complete(bytes);
}

In our example we encapsulate a blocking parseLargeFile() method within a future.

res -> {
	if (res.succeeded()) {
		byte[] bytes = (byte[]) res.result();
		response.putHeader("Content-Type", "application/octet-stream");
		response.setChunked(true);
		response.write(Buffer.buffer(bytes));
		response.setStatusCode(200);
		response.end();
	} else {
		response.setStatusCode(500);
		response.setStatusMessage("Internal error: " + res.cause().getMessage());
		response.end();
	}
});

The res object is in that case an AsyncResult containing the asynchronous result of the previous future.
Here we set up a specific handler in case where the future succeeded and another handler otherwise.

Orchestrate asynchronous executions

Github example

In this example, we will show the different ways to orchestrate asynchronous executions.

Let’s imagine we have implemented these three methods:

// Read file method
private Future<String> readFile() {
	Future<String> future = Future.future();
	
	// Retrieve a FileSystem object from vertx instance and call the
	// non-blocking readFile method
	vertx.fileSystem().readFile("src/main/resources/example03/read.txt", handler -> {
		if (handler.succeeded()) {
			System.out.println("Read content: " + handler.result());
			future.complete("read success");
		} else {
			System.err.println("Error while reading from file: " + handler.cause().getMessage());
			future.fail(handler.cause());
		}
	});
	
	return future;
}

// Write file method
private Future<String> writeFile(String input) {
	Future<String> future = Future.future();
	
	String file = "src/main/resources/example03/write.txt";
	
	// Retrieve a FileSystem object from vertx instance and call the
	// non-blocking writeFile method
	vertx.fileSystem().writeFile(file, Buffer.buffer(input), handler -> {
		if (handler.succeeded()) {
			System.out.println("File written with " + input);
			future.complete(file);
		} else {
			System.err.println("Error while writing in file: " + handler.cause().getMessage());
			
		}
	});
	
	return future;
}

// Write file method
private Future<String> copyFile(String input) {
	Future<String> future = Future.future();
	
	// Retrieve a FileSystem object from vertx instance and call the
	// non-blocking writeFile method
	vertx.fileSystem().copy(input, "src/main/resources/example03/writecopy.txt", handler -> {
		if (handler.succeeded()) {
			System.out.println("Copy done of " + input);
			future.complete("Copy success");
		} else {
			System.err.println("Error while copying a file: " + handler.cause().getMessage());
			future.fail(handler.cause());
		}
	});
	
	return future;
}

Each method now is based upon non-blocking a non-blocking API provided by Vert.x. We set up handlers on success and failure.

The next challenge will be to orchestrate them.

In a synchronous mode you could have done something like:
copyFile(writeFile(readFile))

Of course with asynchronous developments, this is not possible anymore.
There are two ways to do it. Let’s see the first one:

// Orchestration based on setHandler
private void method1() {
	// Create first step
	Future<String> future1 = readFile();

	// Set handler on future1
	future1.setHandler(res1 -> {
		if (res1.succeeded()) {
			Future<String> future2 = writeFile(res1.result());

			// Set handler on future 2
			future2.setHandler(res2 -> {
				if (res2.succeeded()) {
					Future<String> future3 = copyFile(res2.result());

					// Set handler on future 3
					future3.setHandler(res3 -> {
						if (res3.succeeded()) {
							System.out.println(res3.result());
						} else {
							// Manage doSmthg3 errors
						}
					});
				} else {
					// Manage doSmthg2 errors
				}
			});
		} else {
			// Manage doSmthg1 errors
		}
	});
}

Pretty verbose right? Let’s dig into the code.

Future<String> future1 = readFile();

The first action is to retrieve the Future object sent back by the readFile() method.

future.setHandler(res -> {
	if (res.succeeded()) {
		// If the future succeeded
		Object o = res.result();
		// Do something...
	} else {
		// If the future failed
	}
}

The rest of the code is basically a composition of such patterns.
We set up a handler if the future succeeded and another one if it failed. The result of the previous execution is retrieved with the result() method

Since 3.2.1, there is another way, less verbose, to orchestrate asynchronous functions:

// Orchestration based on compose
private void method2() {
	// Create first step
	Future<String> future1 = readFile();

	// Define future1 composition
	future1.compose(s1 -> {
		Future<String> future2 = writeFile(future1.result());

		// Define future2 composition
		future2.compose(s2 -> {
			Future<String> future3 = copyFile(future2.result());

			// Because the future3 is the last, we define here a handler
			future3.setHandler(handler -> {
				if (handler.succeeded()) {
					System.out.println(handler.result());
				} else {
					// Manage doSmthg3 errors
				}
			});
		} , Future.future().setHandler(handler -> {
			// Manage doSmthg2 errors
		}));
	} , Future.future().setHandler(handler -> {
		// Manage doSmthg1 errors
	}));
}

The recurring pattern is now the following:

future.compose(res -> {
	// If the future succeeded
	Object o = res.result;
	//Do something
}, Future.future().setHandler(handler -> {
	// If the future failed
}));

The compose method takes in argument a Handler invoked in the execution was in success and another failure in which the failure (if any) will be propagated.
At first glance, this implementation may seem less natural but it is about 30% less expensive in terms of lines of code than the first implementation with setHandler().

Last but not least, it is worth noting that the CompositeFuture object offers static methods to implement handlers on the completion of a future list for example. This might be useful if all your futures are independent (one future does not depends on the execution of another one).

Use the Event Bus to send and receive messages

Github example

Let’s get on the Event Bus to see how to receive and send/publish messages.

public void start(Future<Void> startFuture) {
	//Retrieve the EventBus object form the vertx one
	EventBus eventBus = vertx.eventBus();

	//Create a EventBus consumer and instantiate a JsonObject type message consumer
	MessageConsumer<JsonObject> createConsumer = eventBus.consumer("customer.create");
	
	//Handle new messages on customer.create endpoint
	createConsumer.handler(json -> {
		System.out.println("Received new customer: " + json.body());
		//Enrich the customer object
		JsonObject enrichedCustomer = enrichCustomer(json.body());
		
		//Publish (one-to-many) the enriched message on another endpoint
		eventBus.publish("customer.completion", enrichedCustomer);
	});
	
	startFuture.complete();
}

Let’s detail the key steps.

EventBus eventBus = vertx.eventBus();

The Event Bus instance is retrieved from the vertx object.

MessageConsumer<JsonObject> createConsumer = eventBus.consumer("customer.create");

We create a JsonObject-typed MessageConsumer with a simple call to the consumer() method in defining a logical name for the channel.

createConsumer.handler(json -> {
});

From the MessageConsumer object, we define then an explicit handler that will be executed each time we will receive a new message in the customer.create channel.
The json lambda will be directly formatted as a JsonObject.

Let’s now imagine that after having enriched the JsonObject, we now want to republish it onto another channel. This can be simply done using:

eventBus.publish("customer.completion", enrichedCustomer);

In this example we called the publish() method (one-to-many) but if we wanted a point-to-point interaction, we would have called the send() method (one-to-one).

If the Event Bus consumer is defined within a standard verticle, the same event loop model will be applied. So same alert, be careful on never blocking your implementation.

Deploy verticles

Github example

As we have seen there are three different types of verticles. Let’s first see how to deploy a standard one:

vertx.deployVerticle("org.enterpriseintegration.Standard",
	// Instantiate a DeploymentOptions by setting an explicit number
	// of instances and referencing a JSON configuration
	new DeploymentOptions().setInstances(instances).setConfig(config), res -> {
		if (res.succeeded()) {
			System.out.println("Standard verticle deployed");

			// Send messages on the event bus
			EventBus eventBus = vertx.eventBus();
			eventBus.publish("event", "event01");
			eventBus.send("event", "event02");
			eventBus.send("event", "event03");
		} else {
			System.out.println("Error while deploying a verticle: " + res.cause().getMessage());
		}
	}
);

The deployVerticle() method takes as a first argument either a String referencing the implementation class or a Verticle object.
The second argument is a DeploymentOptions object, used to set up options during the verticle deployment.

Here we can set up the number of instance (or event loops) for a verticle by calling the setInstances() method.
The setConfig() call shows how to pass information from the parent to the verticle instance. The config object is a JsonObject.

One important thing, if we define an Event Bus consumer on the event channel with two verticle instances, the published message event01 will be received by both instances. This may not be an expected behavior, so you have either to restrict your instances number to only one or to use another verticle type.

The following example shows how to deploy a worker verticle:

vertx.deployVerticle("org.enterpriseintegration.Worker",
	// Instantiate a DeploymentOptions by setting an explicit number
	// of instances and enabling worker code
	new DeploymentOptions().setInstances(instances).setWorker(true), res -> {
		if (res.succeeded()) {
			System.out.println("Worker verticle deployed");

			// Send messages on the event bus
			EventBus eventBus = vertx.eventBus();
			eventBus.publish("event", "event01");
			eventBus.send("event", "event02");
			eventBus.send("event", "event03");
		} else {
			System.out.println("Error while deploying a verticle: " + res.cause().getMessage());
		}
	}
);

The only change (except that here we do not show how to pass a configuration object), is the call to the setWorker() method. This method takes as an argument a boolean to indicate simply whether a verticle is a worker.

Usually, because the worker type follows the more classical model one event = one thread, we need to set up a higher number of instances than for standard verticles.

With the same deployment configuration (two instances), the worker verticle will still receive event01 twice. Then it will manage event02 and event03. So same alert than before because it may lead to create duplicates.

The solution for this very scenario is to use a multi-threaded worker:

vertx.deployVerticle("org.enterpriseintegration.MTWorker",
	// Instantiate a DeploymentOptions by setting an explicit number
	// of instances and enabling worker code, multiThreaded mode but
	// without having to set the number of instances
	new DeploymentOptions().setWorker(true).setMultiThreaded(true), res -> {
		if (res.succeeded()) {
			System.out.println("Multi-threaded Worker verticle deployed");

			// Send messages on the event bus
			EventBus eventBus = vertx.eventBus();
			eventBus.publish("event", "event01");
			eventBus.send("event", "event02");
			eventBus.send("event", "event03");

		} else {
			System.out.println("Error while deploying a verticle: " + res.cause().getMessage());
		}
	}
);

In that example, we called the setMultiThreaded method with true.
As you can see here we do not configure the number of instances. For a multi-threaded worker verticle, this value i not used.

A multi-threaded worker is managed by only one single instance but can be executed concurrently by several threads. In comparison a default worker is managed by one or several instances but each instance is executed by only one thread.

In this example, the event01 will be received only once (so did the event02 and event03 of course).

What else?

In this first post we covered the core principles of Vert.x: Event loop model, Verticle and Event Bus.

As we have seen, Vert.x is a dedicated framework for implementing reactive applications. It’s worth adding as well that the Verticle model fits really well with micro-services concepts.

In addition to what we have seen, Vert.x offers also many other functionalities: a Rx API, a standard for reactive streams and fibers, some tools for generating the code or the documentation, integration with Cloud providers such as OpenShift, a Docker integration, some bridges with Service Discovery tools such as Consul etc…

Vert.x is definitely a very rich framework and this post will only be the first of a series.

If you are interested in the solution, you should check out the following links:

  • Binh Thanh Nguyen

    Thanks, nice post