[![Join the chat at https://gitter.im/arnaud-lb/php-rdkafka](https://badges.gitter.im/arnaud-lb/php-rdkafka.svg)](https://gitter.im/arnaud-lb/php-rdkafka?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
PHP-rdkafka is a **stable**, **production-ready**, **long term support**, and **fast** Kafka client for PHP based on [librdkafka](https://github.com/edenhill/librdkafka).
It supports PHP 7, PHP 8, PHP 5 (in older versions), all librdkafka versions since 0.11, all Kafka versions since 0.8. This makes it **easy to deploy** the extension in production.
The goal of the extension is to be a low-level un-opinionated librdkafka binding focused on production and long term support.
The high level and low level *consumers*, *producer*, and *metadata* APIs are supported.
Documentation is available [here](https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html).
<li>True Serverless Kafka with per-request-pricing</li>
<li>Managed Apache Kafka, works with all Kafka clients</li>
<li>Built-in REST API designed for serverless and edge functions</li>
</ul>
[Start for free in 30 seconds!](https://upstash.com/?utm_source=php-rdkafka)
</td>
</tr>
</table>
php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the [Support Ukraine collective](https://opencollective.com/support-ukraine).
Configuration parameters used below can be found in [Librdkafka Configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
### Producing
#### Creating a producer
For producing, we first need to create a producer, and to add brokers (Kafka
servers) to it:
``` php
<?php
$conf=newRdKafka\Conf();
$conf->set('log_level',(string)LOG_DEBUG);
$conf->set('debug','all');
$rk=newRdKafka\Producer($conf);
$rk->addBrokers("10.0.0.1:9092,10.0.0.2:9092");
```
#### Producing messages
> **Warning** Make sure that your producer follows proper shutdown (see below) to not lose messages.
Next, we create a topic instance from the producer:
``` php
<?php
$topic=$rk->newTopic("test");
```
From there, we can produce as much messages as we want, using the produce
The first argument is the partition. RD_KAFKA_PARTITION_UA stands for
*unassigned*, and lets librdkafka choose the partition.
The second argument are message flags and should be either 0
or `RD_KAFKA_MSG_F_BLOCK` to block produce on full queue.
The message payload can be anything.
#### Proper shutdown
This should be done prior to destroying a producer instance
to make sure all queued and in-flight produce requests are completed
before terminating. Use a reasonable value for `$timeout_ms`.
> **Warning** Not calling flush can lead to message loss!
```php
$rk->flush($timeout_ms);
```
In case you don't care about sending messages that haven't been sent yet,
you can use `purge()` before calling `flush()`:
```php
// Forget messages that are not fully sent yet
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);
$rk->flush($timeout_ms);
```
### High-level consuming
The RdKafka\KafkaConsumer class supports automatic partition assignment/revocation. See the example [here](https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html#example-1).
### Low-level consuming (legacy)
> **Note** The low-level consumer is a legacy API, please prefer using the high-level consumer
We first need to create a low level consumer, and to add brokers (Kafka
servers) to it:
``` php
<?php
$conf=newRdKafka\Conf();
$conf->set('log_level',(string)LOG_DEBUG);
$conf->set('debug','all');
$rk=newRdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");
```
Next, create a topic instance by calling the `newTopic()` method, and start
consuming on partition 0:
``` php
<?php
$topic=$rk->newTopic("test");
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the ``queued.max.messages.kbytes`` parameter on your consumers.
### topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms
Each consumer and producer instance will fetch topics metadata at an interval defined by the ``topic.metadata.refresh.interval.ms`` parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.
librdkafka fetches the metadata for all topics of the cluster by default. Setting ``topic.metadata.refresh.sparse`` to the string ``"true"`` makes sure that librdkafka fetches only the topics he uses.
Setting ``topic.metadata.refresh.sparse`` to ``"true"``, and ``topic.metadata.refresh.interval.ms`` to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.
### internal.termination.signal
This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.
When enabling this, you have to mask the signal like this:
``` php
<?php
// once
pcntl_sigprocmask(SIG_BLOCK,array(SIGIO));
// any time
$conf->set('internal.termination.signal',SIGIO);
```
### socket.blocking.max.ms (librdkafka < 1.0.0)
> Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.
Reducing the value of this setting improves shutdown speed. The value defines the maximum time librdkafka will block in one iteration of a read loop. This also defines how often the main librdkafka thread will check for termination.
### queue.buffering.max.ms
This defines the maximum and default time librdkafka will wait before sending a batch of messages. Reducing this setting to e.g. 1ms ensures that messages are sent ASAP, instead of being batched.
This has been seen to reduce the shutdown time of the rdkafka instance, and of the PHP process / request.
## Performance / Low-latency settings
Here is a configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.
``` php
<?php
$conf=new\RdKafka\Conf();
$conf->set('socket.timeout.ms',50);// or socket.blocking.max.ms, depending on librdkafka version
if(function_exists('pcntl_sigprocmask')){
pcntl_sigprocmask(SIG_BLOCK,array(SIGIO));
$conf->set('internal.termination.signal',SIGIO);
}else{
$conf->set('queue.buffering.max.ms',1);
}
$producer=new\RdKafka\Producer($conf);
$consumer=new\RdKafka\Consumer($conf);
```
It is advised to call poll at regular intervals to serve callbacks. In `php-rdkafka:3.x`
poll was also called during shutdown, so not calling it in regular intervals might
lead to a slightly longer shutdown. The example below polls until there are no more events in the queue:
The source of the documentation can be found [here](https://github.com/arnaud-lb/php-rdkafka-doc)
## Asking for Help
If the documentation is not enough, feel free to ask a questions on the php-rdkafka channels on [Gitter](https://gitter.im/arnaud-lb/php-rdkafka) or [Google Groups](https://groups.google.com/forum/#!forum/php-rdkafka).
## Stubs
Because your IDE is not able to auto discover php-rdkadka api you can consider usage of external package providing a set of stubs for php-rdkafka classes, functions and constants: [kwn/php-rdkafka-stubs](https://github.com/kwn/php-rdkafka-stubs)
## Contributing
If you would like to contribute, thank you :)
Before you start, please take a look at the [CONTRIBUTING document](https://github.com/arnaud-lb/php-rdkafka/blob/master/CONTRIBUTING.md) to see how to get your changes merged in.
## Credits
Documentation copied from [librdkafka](https://github.com/edenhill/librdkafka).
Authors: see [contributors](https://github.com/arnaud-lb/php-rdkafka/graphs/contributors).
## License
php-rdkafka is released under the [MIT](https://github.com/arnaud-lb/php-rdkafka/blob/master/LICENSE) license.
# An open-source project by [![Conduktor.io](https://www.conduktor.io/uploads/conduktor.svg)](https://conduktor.io/)
This project is sponsored by [Conduktor.io](https://www.conduktor.io/), a graphical desktop user interface for Apache Kafka.
Once you have started your cluster, you can use Conduktor to easily manage it.
Just connect against `localhost:9092`. If you are on Mac or Windows and want to connect from another container, use `host.docker.internal:29092`
# kafka-stack-docker-compose
This replicates as well as possible real deployment configurations, where you have your zookeeper servers and kafka servers actually all distinct from each other. This solves all the networking hurdles that comes with Docker and docker-compose, and is compatible cross platform.
**UPDATE**: No /etc/hosts file changes are necessary anymore. Explanations at: https://rmoff.net/2018/08/02/kafka-listeners-explained/
## Stack version
- Zookeeper version: 3.6.3 (Confluent 7.2.0)
- Kafka version: 3.2.0 (Confluent 7.2.0)
- Kafka Schema Registry: Confluent 7.2.0
- Kafka Rest Proxy: Confluent 7.2.0
- Kafka Connect: Confluent 7.2.0
- ksqlDB Server: Confluent 7.2.0
- Zoonavigator: 1.1.1
For a UI tool to access your local Kafka cluster, use the free version of [Conduktor](https://www.conduktor.io/download)
# Requirements
Kafka will be exposed on `127.0.0.1` or `DOCKER_HOST_IP` if set in the environment.
(You probably don't need to set it if you're not using Docker-Toolbox)
## Docker-Toolbox
Docker toolbox is [deprecated](https://github.com/docker-archive/toolbox) and not maintained anymore for several years.
We can't guarantee this stack will work with Docker Toolbox, but if you want to try anyway, please export your environment before starting the stack:
```
export DOCKER_HOST_IP=192.168.99.100
```
(your docker machine IP is usually `192.168.99.100`)
## Apple M1 support
Confluent platform supports Apple M1 (ARM64) since version `7.2.0`! Basically, this stack will work out of the box.
If you want to downgrade confluent platform version, there are two ways:
1. Add `platform: linux/amd64`. It will work as docker is able to emulate AMD64 instructions.
2. Previous versions have been [built](https://github.com/arm64-compat/confluent-platform) for ARM64 by the community. If you want to use it, just change the image in the corresponding yml. Since it is a not an official image, use it at your own risks.
## Single Zookeeper / Single Kafka
This configuration fits most development requirements.
- Zookeeper will be available at `$DOCKER_HOST_IP:2181`
- Kafka will be available at `$DOCKER_HOST_IP:9092`
- (experimental) JMX port at `$DOCKER_HOST_IP:9999`
Run with:
```
docker-compose -f zk-single-kafka-single.yml up
docker-compose -f zk-single-kafka-single.yml down
```
## Single Zookeeper / Multiple Kafka
If you want to have three brokers and experiment with kafka replication / fault-tolerance.
- Zookeeper will be available at `$DOCKER_HOST_IP:2181`
- Kafka will be available at `$DOCKER_HOST_IP:9092,$DOCKER_HOST_IP:9093,$DOCKER_HOST_IP:9094`
Run with:
```
docker-compose -f zk-single-kafka-multiple.yml up
docker-compose -f zk-single-kafka-multiple.yml down
```
## Multiple Zookeeper / Single Kafka
If you want to have three zookeeper nodes and experiment with zookeeper fault-tolerance.
- Zookeeper will be available at `$DOCKER_HOST_IP:2181,$DOCKER_HOST_IP:2182,$DOCKER_HOST_IP:2183`
- Kafka will be available at `$DOCKER_HOST_IP:9092`
- (experimental) JMX port at `$DOCKER_HOST_IP:9999`
Run with:
```
docker-compose -f zk-multiple-kafka-single.yml up
docker-compose -f zk-multiple-kafka-single.yml down
```
## Multiple Zookeeper / Multiple Kafka
If you want to have three zookeeper nodes and three kafka brokers to experiment with production setup.
- Zookeeper will be available at `$DOCKER_HOST_IP:2181,$DOCKER_HOST_IP:2182,$DOCKER_HOST_IP:2183`
- Kafka will be available at `$DOCKER_HOST_IP:9092,$DOCKER_HOST_IP:9093,$DOCKER_HOST_IP:9094`
Run with:
```
docker-compose -f zk-multiple-kafka-multiple.yml up
docker-compose -f zk-multiple-kafka-multiple.yml down
```
## Full stack
Need a UI? We recommend using [Conduktor](https://conduktor.io) as your tool to bring a unified UI to all these components
- Single Zookeeper: `$DOCKER_HOST_IP:2181`
- Single Kafka: `$DOCKER_HOST_IP:9092`
- Kafka Schema Registry: `$DOCKER_HOST_IP:8081`
- Kafka Rest Proxy: `$DOCKER_HOST_IP:8082`
- Kafka Connect: `$DOCKER_HOST_IP:8083`
- KSQL Server: `$DOCKER_HOST_IP:8088`
- Zoonavigator Web: `$DOCKER_HOST_IP:8004`
- (experimental) JMX port at `$DOCKER_HOST_IP:9999`
Run with:
```
docker-compose -f full-stack.yml up
docker-compose -f full-stack.yml down
```
# FAQ
## Kafka
**Q: Kafka's log is too verbose, how can I reduce it?**
A: Add the following line to your docker-compose environment variables: `KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"`. Full logging control can be accessed here: https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
**Q: How do I delete data to start fresh?**
A: Your data is persisted from within the docker compose folder, so if you want for example to reset the data in the full-stack docker compose, do a `docker-compose -f full-stack.yml down`.
**Q: Can I change the zookeeper ports?**
A: yes. Say you want to change `zoo1` port to `12181` (only relevant lines are shown):
```
zoo1:
ports:
- "12181:12181"
environment:
ZOO_PORT: 12181
kafka1:
environment:
KAFKA_ZOOKEEPER_CONNECT: "zoo1:12181"
```
**Q: Can I change the Kafka ports?**
A: yes. Say you want to change `kafka1` port to `12345` (only relevant lines are shown). Note only `LISTENER_DOCKER_EXTERNAL` changes:
**Q: Kafka is using a lot of disk space for testing. Can I reduce it?**
A: yes. This is for testing only!!! Reduce the KAFKA_LOG_SEGMENT_BYTES to 16MB and the KAFKA_LOG_RETENTION_BYTES to 128MB
```
kafka1:
image: confluentinc/cp-kafka:7.2.0
...
environment:
...
# For testing small segments 16MB and retention of 128MB
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
```
**Q: How do I expose kafka?**
A: If you want to expose kafka outside of your local machine, you must set `KAFKA_ADVERTISED_LISTENERS` to the IP of the machine so that kafka is externally accessible. To achieve this you can set `LISTENER_DOCKER_EXTERNAL` to the IP of the machine.
For example, if the IP of your machine is `50.10.2.3`, follow the sample mapping below:
errcode=$?# save the exit code as the first thing done in the trap function
echo"error $errorcode"
echo"the command executing at the time of the error was"
echo"$BASH_COMMAND"
echo"on line ${BASH_LINENO[0]}"
# do some error handling, cleanup, logging, notification
# $BASH_COMMAND contains the command that was being executed at the time of the trap
# ${BASH_LINENO[0]} contains the line number in the script of that command
# exit the script or return to try again, etc.
# creating stack...
docker-compose -f$file down
exit$errcode# or use some other value or do return instead
}
trap f ERR
all_great(){
# for testing
echo"Verifying Process"
running=`docker-compose -f$1 ps | grep Up | wc-l`
if["$running"!="$2"];then
# for logging
docker-compose -f$1 ps
# debug
docker-compose -f$1 logs
exit 1
fi
}
kafka_tests(){
echo"Testing Kafka"
topic="testtopic"
if grep-q kafka3 $1;then replication_factor="3";else replication_factor="1";fi
for i in 1 2 3 4 5;do echo"trying to create test topic"&& kafka-topics --create--topic$topic--replication-factor$replication_factor--partitions 12 --zookeeper$DOCKER_HOST_IP:2181 &&break||sleep 5;done
sleep 5
for x in{1..100};do echo$x;done | kafka-console-producer --broker-list$DOCKER_HOST_IP:9092 --topic$topic