Home

Awesome

MySQL Binlog client for Vert.x

Travis CI

A Vert.x client for tapping into MySQL replication stream. Based on Vert.x 3.4.1.

It uses MySQL Binary Log connector to interact with the MySQL which implemented the MySQL binlog protocol by java.

How to use

Configure your MySQL master

Be sure the binlog is enabled on the MySQL master and it is in ROW format, otherwise the client cannot receive any row events.

To enable the binary log, start the server with the --log-bin=base_name option. For example:

mysqld --log-bin=mysql-bin --binlog-format=ROW

To specify the format globally for all clients, set the global value of the binlog_format system variable:

SET GLOBAL binlog_format = 'ROW';

Using Binlog Client

Get the latest JAR(s) from here. Alternatively you can include following Maven dependency (available through Maven Central):

<dependency>
    <groupId>io.github.guoyu511</groupId>
    <artifactId>vertx-mysql-binlog</artifactId>
    <version>0.2.0</version>
</dependency>

Tapping into replication stream

In order to connect to MySQL as a slave, you need a BinlogClient instance first.

You can create a client specifying a BinlogClientOptions:

BinlogClient binlogClient = BinlogClient.create(vertx, binlogClientOptions);

The BinlogClientOptions containing the following values:

Be sure that the user must has the REPLICATION CLIENT privilege for the given schema.

You can then connect to the MySQL master with the method connect.

It happens asynchronously and the client may not be connected until some time after the call has returned:

binlogClient.connect();

You can also supplying a handler which will be called after the connection established (or failed).

binlogClient.connect((ar) -> {
  ar.succeeded() // true if connection established
});

After connected to the MySQL master as a slave, the client can handle events now.

Handle Row Events

There were several types of event defined in MySQL binlog protocol. For this client, it only concerned about events related to data modification such as write, update and delete. All the events are presented as JsonObject.

You can set a handler to the client to handle those types of events.

binlogClient.handler((event) -> {
  String type = event.getString("type");
});

For a data modification event (write / update / delete) the JsonObject will be looks like that:

{
  "type" : "write",
  "schema" : "test_db",
  "table" : "test_table",
  "row" : {
    "id" : 1000,
    "name" : "guoyu"
  }
}

The row event containing the following values:

Column mapping

The origin ROW Events sent by MySQL master contains the column index but not the column name.

Therefore, the BinlogClient use a MySQLClient instance to query the column index and names from information_schema database when received a ROW Events of a table for first time. Then it cache the column mapping and build the event object with them.

When there is any DROP TABLE, ALTER TABLE or CREATE TABLE event recevied, the mapping cached will be cleared and the BinlogClient will requery the column mapping for the subsequent ROW Events.

Using as ReadStream

The BinlogClient implemented ReadStream<JsonObject> interface, that means all the methods provided by the ReadStream are available.

For example, use pause to pause reading (that will stop to read from the underlying InputStream) .

binlogClient.pause();

use resume to continue:

binlogClient.resume();

Even using Pump to pump the events to another WriteStream:

Pump.pump(binlogClient, targetStream).start();

Or pump the stream to event bus message producer:

Pump.pump(binlogClient, eventBus.sender('binlog.event')).start();

// handle the event by event bus
eventBus.consumer('binlog.event', (msg) -> {
  JsonObject json = msg.body();
});

Using with RxJava

It also provided a Rx-ified version of the binlog client.

To using the Rx-ified api, create binlog client instance using the BinlogClient interface under the io.vertx.rxjava.ext.binlog.mysql package with the RX version of vertx.

import io.vertx.rxjava.ext.binlog.mysql.BinlogClient;

BinlogClient rxBinlogClient = BinlogClient.create(rxVertx, options);

Or wrap a existing client:

BinlogClient rxClient = BinlogClient.newInstance(client);

Then you can use the client as a Observable, for example, handle all update events:

rxClient.toObservable()
  .filter((event) -> "update".equals(event.getString("type")))
  .subscribe((event) -> {
    //do sth with this event
  });

Binlog Filename and position

Some time you need to know the filename and the position where the replication stream at.

For example, you may want to save the filename and position when a event comes.

You can retrieve them use filename and position :

binlogClient.handle((event) -> {
  //some event coming
  String filename = binlogClient.filename();
  long position = binlogClient.position();
  //save them in any way for future use
});

Next time when you create your client, you can pass the filename and position to the BinlogClientOptions to let the client to connect at the specified position.

binlogClientOptions.setFilename(filename);
binlogClientOptions.setPosition(position);
BinlogClient binlogClient = BinlogClient.create(vertx, binlogClientOptions);

This ensures that you will not lose any events.

Closing the client

You can hold on to the client for a long time (e.g. the life-time of your verticle).

Once you have finished with it, you should close it:

binlogClient.close();
// or close with a callback handler
binlogClient.close((ar) -> {});

Running the Tests

You can run tests with a specified MySQL instance:

% mvn test -Dbinlog.host=[host] -Dbinlog.port=[port] -Dbinlog.user=[user] -Dbinlog.password=[password] -Dbinlog.schema=[schema]

The user must has ALL privileges for the given schema.