Home

Awesome

NAME

Kafka - Apache Kafka low-level synchronous API, which does not use Zookeeper.

VERSION

This documentation refers to Kafka package version 1.08 .

SYNOPSIS

use 5.010;
use strict;
use warnings;

use Scalar::Util qw(
    blessed
);
use Try::Tiny;

use Kafka qw(
    $BITS64
);
use Kafka::Connection;
use Kafka::Producer;
use Kafka::Consumer;

# A simple example of Kafka usage

# common information
say 'This is Kafka package ', $Kafka::VERSION;
say 'You have a ', $BITS64 ? '64' : '32', ' bit system';

my ( $connection, $producer, $consumer );
try {

    #-- Connect to local cluster
    $connection = Kafka::Connection->new( host => 'localhost' );
    #-- Producer
    $producer = Kafka::Producer->new( Connection => $connection );
    #-- Consumer
    $consumer = Kafka::Consumer->new( Connection  => $connection );

} catch {
    my $error = $_;
    if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
        warn 'Error: (', $error->code, ') ',  $error->message, "\n";
        exit;
    } else {
        die $error;
    }
};

# cleaning up
undef $consumer;
undef $producer;
$connection->close;
undef $connection;

# another brief code example of the Kafka package
# is provided in the "An Example" section.

ABSTRACT

The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.9+, a high-throughput distributed messaging system.

DESCRIPTION

The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.

The main features of the package are:

APACHE KAFKA'S STYLE COMMUNICATION

The Kafka package is based on Kafka's 0.9+ Protocol specification document at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The Connection Object

Clients use the Connection object to communicate with the Apache Kafka cluster. The Connection object is an interface layer between your application code and the Apache Kafka cluster.

Connection object is required to create instances of classes Kafka::Producer or Kafka::Consumer.

Kafka Connection API is implemented by Kafka::Connection class.

use Kafka::Connection;

# connect to local cluster with the defaults
my $connection = Kafka::Connection->new( host => 'localhost' );

The main attributes of the Connection object are:

The IO Object

The Kafka::Connection object use internal class Kafka::IO to maintain communication with the particular server of Kafka cluster The IO object is an interface layer between Kafka::Connection object and the network.

Kafka IO API is implemented by Kafka::IO class. Note that end user normally should have no need to use Kafka::IO but work with Kafka::Connection instead.

use Kafka::IO;

# connect to local server with the defaults
my $io = Kafka::IO->new( host => 'localhost' );

The main attributes of the IO object are:

The Producer Object

Kafka producer API is implemented by Kafka::Producer class.

use Kafka::Producer;

#-- Producer
my $producer = Kafka::Producer->new( Connection => $connection );

# Sending a single message
$producer->send(
    'mytopic',          # topic
    0,                  # partition
    'Single message'    # message
);

# Sending a series of messages
$producer->send(
    'mytopic',          # topic
    0,                  # partition
    [                   # messages
        'The first message',
        'The second message',
        'The third message',
    ]
);

The main methods and attributes of the producer request are:

The Consumer Object

Kafka consumer API is implemented by Kafka::Consumer class.

use Kafka::Consumer;

$consumer = Kafka::Consumer->new( Connection => $connection );

The request methods of the consumer object are offsets() and fetch().

offsets method returns a reference to the list of offsets of received messages.

fetch method returns a reference to the list of received Kafka::Message objects.

use Kafka qw(
    $DEFAULT_MAX_BYTES
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $RECEIVE_EARLIEST_OFFSET
);

# Get a list of valid offsets up to max_number before the given time
my $offsets = $consumer->offsets(
    'mytopic',                      # topic
    0,                              # partition
    $RECEIVE_EARLIEST_OFFSET,      # time
    $DEFAULT_MAX_NUMBER_OF_OFFSETS  # max_number
);
say "Received offset: $_" foreach @$offsets;

# Consuming messages
my $messages = $consumer->fetch(
    'mytopic',                      # topic
    0,                              # partition
    0,                              # offset
    $DEFAULT_MAX_BYTES              # Maximum size of MESSAGE(s) to receive
);
foreach my $message ( @$messages ) {
    if ( $message->valid ) {
        say 'payload    : ', $message->payload;
        say 'key        : ', $message->key;
        say 'offset     : ', $message->offset;
        say 'next_offset: ', $message->next_offset;
    } else {
        say 'error      : ', $message->error;
    }
}

See Kafka::Consumer for additional information and documentation about class methods and arguments.

The Message Object

Kafka message API is implemented by Kafka::Message class.

if ( $message->valid ) {
    say 'payload    : ', $message->payload;
    say 'key        : ', $message->key;
    say 'offset     : ', $message->offset;
    say 'next_offset: ', $message->next_offset;
} else {
    say 'error      : ', $message->error;
}

Methods available for Kafka::Message object :

The Exception Object

A designated class Kafka::Exception is used to provide a more detailed and structured information when error is detected.

The following attributes are declared within Kafka::Exception: code, message.

Additional subclasses of Kafka::Exception designed to report errors in respective Kafka classes: Kafka::Exception::Connection, Kafka::Exception::Consumer, Kafka::Exception::IO, Kafka::Exception::Int64, Kafka::Exception::Producer.

Authors suggest using of Try::Tiny's try and catch to handle exceptions while working with Kafka module.

EXPORT

None by default.

Additional constants

Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.

IP version

Specify IP protocol version for resolving of IP address and host names.

Compression

According to Apache Kafka documentation:

Kafka currently supports three compression codecs with the following codec numbers:

Error codes

Possible error codes (corresponds to descriptions in %ERROR):

Contains the descriptions of possible error codes obtained via ERROR_CODE box of Apache Kafka Wire Format protocol response.

An Example

use 5.010;
use strict;
use warnings;

use Scalar::Util qw(
    blessed
);
use Try::Tiny;

use Kafka qw(
    $KAFKA_SERVER_PORT
    $REQUEST_TIMEOUT
    $RECEIVE_EARLIEST_OFFSET
    $DEFAULT_MAX_NUMBER_OF_OFFSETS
    $DEFAULT_MAX_BYTES
);
use Kafka::Connection;
use Kafka::Producer;
use Kafka::Consumer;

my ( $connection, $producer, $consumer );
try {

    #-- Connection
    $connection = Kafka::Connection->new( host => 'localhost' );

    #-- Producer
    $producer = Kafka::Producer->new( Connection => $connection );

    # Sending a single message
    $producer->send(
        'mytopic',                      # topic
        0,                              # partition
        'Single message'                # message
    );

    # Sending a series of messages
    $producer->send(
        'mytopic',                      # topic
        0,                              # partition
        [                               # messages
            'The first message',
            'The second message',
            'The third message',
        ]
    );

    #-- Consumer
    $consumer = Kafka::Consumer->new( Connection => $connection );

    # Get a list of valid offsets up max_number before the given time
    my $offsets = $consumer->offsets(
        'mytopic',                      # topic
        0,                              # partition
        $RECEIVE_EARLIEST_OFFSET,      # time
        $DEFAULT_MAX_NUMBER_OF_OFFSETS  # max_number
    );

    if ( @$offsets ) {
        say "Received offset: $_" foreach @$offsets;
    } else {
        warn "Error: Offsets are not received\n";
    }

    # Consuming messages
    my $messages = $consumer->fetch(
        'mytopic',                      # topic
        0,                              # partition
        0,                              # offset
        $DEFAULT_MAX_BYTES              # Maximum size of MESSAGE(s) to receive
    );

    if ( $messages ) {
        foreach my $message ( @$messages ) {
            if ( $message->valid ) {
                say 'payload    : ', $message->payload;
                say 'key        : ', $message->key;
                say 'offset     : ', $message->offset;
                say 'next_offset: ', $message->next_offset;
            } else {
                say 'error      : ', $message->error;
            }
        }
    }

} catch {
    my $error = $_;
    if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
        warn 'Error: (', $error->code, ') ',  $error->message, "\n";
        exit;
    } else {
        die $error;
    }
};

# Closes and cleans up
undef $consumer;
undef $producer;
$connection->close;
undef $connection;

DEPENDENCIES

In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:

Compress::Snappy
Compress::LZ4Frame
Const::Fast
Data::Compare
Data::HexDump::Range
Data::Validate::Domain
Data::Validate::IP
Exception::Class
List::Utils
Params::Util
Scalar::Util::Numeric
String::CRC32
Sys::SigAction
Try::Tiny

Kafka package has the following optional dependencies:

Capture::Tiny
Clone
Config::IniFiles
File::HomeDir
Proc::Daemon
Proc::ProcessTable
Sub::Install
Test::Deep
Test::Exception
Test::NoWarnings
Test::TCP

If the optional modules are missing, some "prereq" tests are skipped.

DIAGNOSTICS

Debug output can be enabled by setting level via one of the following environment variables:

PERL_KAFKA_DEBUG=1 - debug is enabled for the whole Kafka package.

PERL_KAFKA_DEBUG=IO:1 - enable debug only for Kafka::IO only.

PERL_KAFKA_DEBUG=Connection:1 - enable debug only for particular Kafka::Connection.

It's possible to set different debug levels, like in the following example:

PERL_KAFKA_DEBUG=Connection:1,IO:2

See documentation for a particular module for explanation of various debug levels.

BUGS AND LIMITATIONS

Connection constructor:

Make sure that you always connect to brokers using EXACTLY the same address or host name as specified in broker configuration (host.name in server.properties). Avoid using default value (when host.name is commented) in server.properties - always use explicit value instead.

Producer and Consumer methods only work with one topic and one partition at a time. Also module does not implement the Offset Commit/Fetch API.

Producer's, Consumer's, Connection's string arguments must be binary strings. Using Unicode strings may cause an error or data corruption.

This module does not support Kafka protocol versions earlier than 0.8.

Kafka::IO->new' uses Sys::SigAction and alarm() to limit some internal operations. This means that if an external alarm() was set, signal delivery may be delayed.

With non-empty timeout, we use alarm() internally in Kafka::IO and try preserving existing alarm() if possible. However, if Time::HiRes::ualarm() is set before calling Kafka modules, its behaviour is unspecified (i.e. it could be reset or preserved etc.).

For gethostbyname operations the non-empty timeout is rounded to the nearest greater positive integer; any timeouts less than 1 second are rounded to 1 second.

You can disable the use of alarm() by setting timeout => undef in the constructor.

The Kafka package was written, tested, and found working on recent Linux distributions.

There are no known bugs in this package.

Please report problems to the "AUTHOR".

Patches are welcome.

MORE DOCUMENTATION

All modules contain detailed information on the interfaces they provide.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at http://kafka.apache.org/

Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

SOURCE CODE

Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka

AUTHOR

Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Nikolay Shulyakovskiy

Vlad Marchenko

Damien Krotkine

Greg Franklin

COPYRIGHT AND LICENSE

Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.