Home

Awesome

MySqlCdc

nuget build

MySQL/MariaDB binlog change data capture (CDC) connector for .NET

Implements MySQL replication protocol to stream binary log events in real-time.

Use cases

Transaction log events are immutable and appended in strictly sequential order. This simplifies your concurrency model and allows you to avoid distributed locks that handle race conditions from parallel database requests.

Warnings

Be careful when working with binary log event streaming.

Limitations

Please note the lib currently has the following limitations:

Prerequisites

Please make sure the following requirements are met:

  1. The user is granted REPLICATION SLAVE, REPLICATION CLIENT privileges.

  2. Binary logging is enabled(it's done by default in MySQL 8). To enable binary logging configure the following settings on the master server and restart the service:

    binlog_format = row
    binlog_row_image = full
    

    MySQL 5.6/5.7 and MariaDB 10.1 also require the following line:

    server-id = 1
    
  3. Optionally in MySQL 8.0.1+ / MariaDB 10.5+ you can enable logging table metadata(column names, types, see TableMetadata class).

    binlog_row_metadata = full
    
  4. Optionally you can enable logging SQL queries that precede row based events and listen to RowsQueryEvent.

    MySQL

    binlog_rows_query_log_events = on
    

    MariaDB

    binlog_annotate_row_events = on
    
  5. Also note that there are expire_logs_days, binlog_expire_logs_seconds settings that control how long binlog files live. By default MySQL/MariaDB have expiration time set and delete expired binlog files. You can disable automatic purging of binlog files this way:

    expire_logs_days = 0
    

Example

You have to obtain columns ordinal position of the table that you are interested in. The library is not responsible for this so you have to do it using another tool.

select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION
from INFORMATION_SCHEMA.COLUMNS
where TABLE_NAME='AspNetUsers' and TABLE_SCHEMA='Identity'
order by ORDINAL_POSITION;

Alternatively, in MySQL 5.6 / MariaDB 10.5 and newer you can obtain column names by logging full metadata (see TableMetadataEvent.Metadata). This way the metadata is logged with each TableMapEvent which impacts bandwidth.

binlog_row_metadata = full

Binlog event stream replication

Data is stored in Cells property of row events in the same order. See the C# sample project.

var client = new BinlogClient(options =>
{
    options.Port = 3306;
    options.Username = "root";
    options.Password = "Qwertyu1";
    options.SslMode = SslMode.Disabled;
    options.HeartbeatInterval = TimeSpan.FromSeconds(30);
    options.Blocking = true;

    // Start replication from MariaDB GTID. Recommended.
    options.Binlog = BinlogOptions.FromGtid(GtidList.Parse("0-1-270"));

    // Start replication from MySQL GTID. Recommended.
    var gtidSet = "d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
    options.Binlog = BinlogOptions.FromGtid(GtidSet.Parse(gtidSet));

    // Start replication from the master binlog filename and position
    options.Binlog = BinlogOptions.FromPosition("mysql-bin.000008", 195);

    // Start replication from the master last binlog filename and position.
    options.Binlog = BinlogOptions.FromEnd();

    // Start replication from the master first available(not purged) binlog filename and position.
    options.Binlog = BinlogOptions.FromStart();
});

await foreach (var (header, binlogEvent) in client.Replicate())
{
    var state = client.State;

    if (binlogEvent is TableMapEvent tableMap)
    {
        await HandleTableMapEvent(tableMap);
    }
    else if (binlogEvent is WriteRowsEvent writeRows)
    {
        await HandleWriteRowsEvent(writeRows);
    }
    else if (binlogEvent is UpdateRowsEvent updateRows)
    {
        await HandleUpdateRowsEvent(updateRows);
    }
    else if (binlogEvent is DeleteRowsEvent deleteRows)
    {
        await HandleDeleteRowsEvent(deleteRows);
    }
    else await PrintEventAsync(binlogEvent);
}

A typical transaction has the following structure.

  1. GtidEvent if gtid mode is enabled.
  2. One or many TableMapEvent events.
    • One or many WriteRowsEvent events.
    • One or many UpdateRowsEvent events.
    • One or many DeleteRowsEvent events.
  3. XidEvent indicating commit of the transaction.

It's best practice to use GTID replication with the FromGtid method. Using the approach you can correctly perform replication failover. Note that in GTID mode FromGtid has the following behavior:

Reading binlog files offline

In some cases you will need to read binlog files offline from the file system. This can be done using BinlogReader class.

using (Stream stream = File.OpenRead("mysql-bin.000001"))
{
    EventDeserializer deserializer = mariadb
    ? new MariaDbEventDeserializer()
    : new MySqlEventDeserializer();

    var reader = new BinlogReader(deserializer, stream);

    await foreach (var (header, binlogEvent) in reader.ReadEvents())
    {
        await PrintEventAsync(binlogEvent);
    }
}

Type mapping notes

MySQL Type.NET type
BLOB typesbyte[]
GEOMETRYbyte[]
JSON (MySQL)byte[], see below
JSON (MariaDB)byte[], see below
BITbool[]
TINYINTbyte
SMALLINTshort
MEDIUMINTint(3), see below
INTint
BIGINGlong
FLOATfloat
DOUBLEdouble
DECIMALstring
VARCHAR, VARBINARYstring
CHARstring
ENUMint
SETlong
YEARint
DATENullable<DateOnly>
DATETIMENullable<DateTime>
TIMETimeSpan
TIMESTAMPDateTimeOffset

Similar projects

The project is based on mysql-binlog-connector-java library, MariaDB and MySQL documentation.

Supported versions

MySqlCdc supports both MariaDB & MySQL server.

MariaDBStatus
10.1✅ Supported
10.2✅ Supported
10.3✅ Supported
10.4✅ Supported
10.5✅ Supported
10.6✅ Supported
MySQLStatus
5.6✅ Supported
5.7✅ Supported
8.0✅ Supported

License

The library is provided under the MIT License.