Deprecation notice
This library has been deprecated in favour of php-kafka-lib.
It will still receive bugfixes until end of 10/2020, after that it will be marked as abandoned on packagist.
Please check for the migration path.
Generic php messaging library Supports:
- Kafka
This is a convenience wrapper for arnaud-lb/php-rdkafka
Avro support relies on flix-tech/avro-serde-php
To read more about the functions used in this lib, check out the documentation of the extension.
- php: ^7.1
- ext-rdkafka: ^4.0.0
composer require jobcloud/messaging-lib "~4.0"
Simple example
use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage;
use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder;
$producer = KafkaProducerBuilder::create()
$message = KafkaProducerMessage::create('test-topic', 0)
->withBody('some test message payload')
->withHeaders([ 'key' => 'value' ]);
Avro Producer
To create an avro prodcuer add the avro encoder.
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Messaging\Kafka\Message\KafkaProducerMessage;
use Jobcloud\Messaging\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry;
use \Jobcloud\Messaging\Kafka\Producer\KafkaProducerBuilder;
use \Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
new AvroObjectCacheAdapter()
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
new KafkaAvroSchema('schemaName' /*, int $version, AvroSchema $definition */)
$encoder = new AvroEncoder($registry, $recordSerializer);
$producer = KafkaProducerBuilder::create()
$schemaName = 'testSchema';
$version = 1;
$message = KafkaProducerMessage::create('test-topic', 0)
->withBody(['name' => 'someName'])
->withHeaders([ 'key' => 'value' ]);
NOTE: To improve producer latency you can install the pcntl
The messaging-lib already has code in place, similarly described here:
Kafka High Level
use \Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;
$consumer = KafkaConsumerBuilder::create()
'compression.codec' => 'lz4',
'' => 500
->withTimeout(120 * 10000)
while (true) {
try {
$message = $consumer->consume();
// your business logic
} catch (KafkaConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (KafkaConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (ConsumerException $e) {
// Failed
Kafka Low Level
use \Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;
$consumer = KafkaConsumerBuilder::create()
'compression.codec' => 'lz4',
'' => 500
->withTimeout(120 * 10000)
while (true) {
try {
$message = $consumer->consume();
// your business logic
} catch (KafkaConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (KafkaConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (ConsumerException $e) {
// Failed
Avro Consumer
To create an avro consumer add the avro decoder.
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Messaging\Consumer\ConsumerException;
use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Messaging\Kafka\Exception\KafkaConsumerTimeoutException;
use Jobcloud\Messaging\Kafka\Message\Decoder\AvroDecoder;
use Jobcloud\Messaging\Kafka\Message\KafkaAvroSchema;
use Jobcloud\Messaging\Kafka\Message\Registry\AvroSchemaRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
new AvroObjectCacheAdapter()
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
new KafkaAvroSchema('someSchema' , 9 /* , AvroSchema $definition */)
$decoder = new AvroDecoder($registry, $recordSerializer);
$consumer = KafkaConsumerBuilder::create()
'compression.codec' => 'lz4',
'' => 500
->withTimeout(120 * 10000)
while (true) {
try {
$message = $consumer->consume();
// your business logic
} catch (KafkaConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (KafkaConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (ConsumerException $e) {
// Failed
use \Jobcloud\Messaging\Producer\ProducerPool;
use \Jobcloud\Messaging\Producer\ProducerInterface;
/** @var ProducerInterface $someKafkaProducer */
/** @var ProducerInterface $someRabbitMQProducer */
$pool = new ProducerPool();
$message = KafkaMessage::create('test-topic', 0)
->withBody('some test content')
->withHeaders([ 'key' => 'value' ]);