Kafka go client

Kafka go client DEFAULT

In this post we will learn how to create a Kafka producer and consumer in Go. We will also look at how to tune some configuration options to make our application production-ready.

Kafka is an open-source event streaming platform, used for publishing and processing events at high-throughput. There are a lot of popular libraries for Go in order to interface with Kafka. For this post, we will be using the kafka-go library (but the same concepts will apply for any other library as well).

If you want to skip the explanation and see the code, you can view it on Github

Getting Started

First, make sure you have a running Kafka cluster on your machine.

Before diving into the code, we should know about brokers and topics, which will be needed by both the producer and consumer.

A “topic” can be thought of as a distinct queue or channel where messages are sent.

For most production applications, there isn’t a single Kafka server running, but rather a cluster of multiple servers called “brokers”. The messages for each topic are split amongst the various brokers.

topics and brokers

To learn more about how each topic is split between the brokers, you can read the official docs.

Creating the Kafka Producer

For the purpose of illustration, let’s create a function that writes a message into the Kafka cluster every second, forever:

Creating the Kafka Consumer

When creating a consumer, we need to specify it’s group ID. This is because a single topic can have multiple consumers, and each consumers group ID ensures that multiple consumers belonging to the same group ID don’t get repeated messages.

Let’s create another function that consumes messages from the Kafka cluster whenever they’re available:

Now that we’ve defined the functions to send and receive messages, we can put it all together in the function:

If you run this code, you’ll see an output similar to this:

Something to notice here is that we receive multiple messages at a time, even though we set the writes to take place with a break of one second.

This is because the Kafka client has some default settings that are great for large scale applications, but which you might want to modify if latency is a concern.

In the next section we’ll go through some configuration options that you can set to optimize the Kafka client for your needs.

Tuning Kafka Client Configuration

There are a lot of options that you can configure when creating the Kafka producer and consumer. In this section we’ll go over some of the more important configuration options, and what they mean.

You can see the full list of configuration options here

Minimum Buffered Bytes

As we’ve seen from the previous example, the data received by the consumer isn’t exactly “real time”.

The consumer polls the Kafka brokers to check if there is enough data to receive. The minimum buffered bytes defines what “enough” is. For example, if we have a configuration like this:

this means that if the consumer polls the cluster to check if there is any new data on the topic for the consumer ID, the cluster will only respond if there are at least 5 new bytes of information to send.

In this example, every message is 8 bytes, and the minimum buffered bytes is set to when the total size of pending messages exceeds the minimum buffered bytes, they are sent together as a batch

, on the other hand, defines the maximum quantity of data that the cluster can respond with when polled.

Setting would help to receive the data in batches, which would reduce the overall throughput and load on your system. However, if there is a long period of time that elapses before the amount of new data crosses the value, it would result in the previous data getting stuck for that amount of time.

Max Wait Time

The max wait time setting helps mitigate the problem discussed above. It sets the maximum time to wait between receiving messages from the Kafka cluster, regardless of the setting.

So, if we set up our reader with the following config:

that would mean that the consumer would have to wait at the most 3 seconds before receiving any new messages, even if the new messages did not cross the min bytes setting that we set previously.

In this example, every message is 8 bytes, and the minimum buffered bytes is set to 15 and max wait time is 3 seconds

Start Offset

When a new consumer is added to a topic, it has two options for where it wants to start consuming data from:

  1. Earliest - The consumer will start consuming data for a topic starting from the earliest message that is available.
  2. Latest - Only consume new messages that appear after the consumer has joined the cluster.

offset tell the consumer where to start consuming messages from

These are defined as the and constants in the kafka library:

Note that this only applies for new consumer groups. If you’ve already consumed data with the same consumer setting before, you will continue from wherever you left off.

Message Batching

So far we’ve looked at configuration on the consumer side. Let’s take a look at some producer/writer side configuration options.

Similar to the consumer, the producer also tries to send messages in batches. This is to reduce the total number of network round trips and improve efficiency in writing messages, but comes at the cost of increased overall latency.

When batching messages, we can set:

  1. Batch Size - The total number of messages that should be buffered before writing to the Kafka brokers.
  2. Batch Timeout - The maximum time before which messages are written to the brokers. That means that even if the message batch is not full, they will still be written onto the Kafka cluster once this time period has elapsed.

illustration of batching from the producer

In our code, we can set this configuration using the and options:

If you want your writer to immediately send every message it gets, set the batch size to 1

Required Acknowledgements

When we call the method in our example, it blocks the code until the message is confirmed to be written. However, the definition of what “confirmed” means can be different based on your settings.

Remembers, the Kafka cluster (and your topic partitions) is distributed between multiple brokers. Of these, one of the brokers is the designated leader and the rest are followers.

Keeping this in mind, there are three modes of acknowledgement (represented by integers) when writing messages to the cluster:

  1. All brokers acknowledge that they have received the message (represented as )
  2. Only the leading broker acknowledges that it has received the messages (represented as ). The remaining brokers can still eventually receive the message, but we won’t wait for them to do so.
  3. No one acknowledges receiving the message (represented as ). This is basically a fire-and-forget mode, where we don’t care if our message is received or not. This should only be used for data that you are ok with losing a bit of, but require high throughput for.

required acknowledgements determines when your producer will consider the message as "written"

In our code, we can use the option to set the maximum required acknowledgements:

Logging Options

In our example, we printed to the console every time a messages was written or read. We may want more information sometimes for debugging or to know more about our cluster.

The library comes with the option of providing a logger that can provide more detailed information about the state of your kafka brokers.

Both the reader and writer come with a option that can be set to any value implementing the following interface:

For our example, we can use the standard library and assign a logger object to the reader:

and the writer:

When running the program, we can see that the output contains much more information than before:

Although detailed logging can be helpful in a non-production environment, you should be careful before using it in production, to prevent your logs from being polluted with too much information

Further Reading

This post has covered most of the important options that I personally use in a production environment, but is by no means exhaustive.

There are many more configuration options that you may find important for your use case. You can read the full list here.

For library specific options, you can see the kafka-go docs for readers and writers.

You can see the working example of the code explained here on Github.

Sours: https://www.sohamkamani.com/golang/working-with-kafka/

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka and above.

See the API documentation for more information.

High-level balanced consumer

import ( "fmt""github.com/confluentinc/confluent-kafka-go/kafka" ) funcmain() { c, err:=kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", }) iferr!=nil { panic(err) } c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) for { msg, err:=c.ReadMessage(-1) iferr==nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will automatically try to recover from all errors.fmt.Printf("Consumer error: %v (%v)\n", err, msg) } } c.Close() }

Producer

import ( "fmt""github.com/confluentinc/confluent-kafka-go/kafka" ) funcmain() { p, err:=kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) iferr!=nil { panic(err) } deferp.Close() // Delivery report handler for produced messagesgofunc() { fore:=rangep.Events() { switchev:=e.(type) { case*kafka.Message: ifev.TopicPartition.Error!=nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously)topic:="myTopic"for_, word:=range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries before shutting downp.Flush(15*) }

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Supports Go + and librdkafka +.

Using Go Modules

Starting with Go , you can use Go Modules to install confluent-kafka-go.

Import the package from GitHub in your code:

import"github.com/confluentinc/confluent-kafka-go/kafka"

Build your project:

If you are building for Alpine Linux (musl), must be specified.

go build -tags musl ./

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your file.

Install the client

If Go modules can't be used we recommend that you version pin the confluent-kafka-go import to using gopkg.in:

Manual install:

go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Golang import:

import"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64
  • glibc-based Linux x64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux 64 (Alpine) - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass to , , etc.

must NOT be set to since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with .

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install using Confluent's YUM repository.
  • For MacOS X, install from Homebrew. You may also need to brew install pkg-config if you don't already have it: .
  • For Alpine:
  • confluent-kafka-go is not supported on Windows.
  • For source builds, see instructions below.

Build from source:

After installing librdkafka you will need to build your Go application with .

Note: If you use the branch of the Go client, then you need to use the branch of librdkafka.

confluent-kafka-go requires librdkafka v or later.

There are two main API strands: function and channel-based.

Function-Based Consumer

Messages, errors and events are polled through the function.

Pros:

  • More direct mapping to underlying librdkafka functionality.

Cons:

  • Makes it harder to read from multiple channels, but a go-routine easily solves that (see Cons in channel-based consumer below about outdated events).
  • Slower than the channel consumer.

See examples/consumer_example

Channel-Based Consumer (deprecated)

Deprecated: The channel-based consumer is deprecated due to the channel issues mentioned below. Use the function-based consumer.

Messages, errors and events are posted on the channel for the application to read.

Pros:

  • Possibly more Golang:ish
  • Makes reading from multiple channels easy
  • Fast

Cons:

  • Outdated events and messages may be consumed due to the buffering nature of channels. The extent is limited, but not remedied, by the Events channel buffer size ().

See examples/consumer_channel_example

Channel-Based Producer

Application writes messages to the . Delivery reports are emitted on the or specified private channel.

Pros:

  • Go:ish
  • Proper channel backpressure if librdkafka internal queue is full.

Cons:

  • Double queueing: messages are first queued in the channel (size is configurable) and then inside librdkafka.

See examples/producer_channel_example

Function-Based Producer

Application calls to produce messages. Delivery reports are emitted on the or specified private channel.

Pros:

Cons:

  • is a non-blocking call, if the internal librdkafka queue is full the call will fail.
  • Somewhat slower than the channel producer.

See examples/producer_example

Apache License v

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run , tests, etc, push your branch, create a PR, and sign the CLA.

Sours: https://github.com/confluentinc/confluent-kafka-go
  1. Napalm ansible
  2. Chrysler 200 review edmunds
  3. Blue star ointment hair growth

kafka-go CircleCIGo Report CardGoDoc

Motivations

We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this writing was not ideal. The available options were:

  • sarama, which is by far the most popular but is quite difficult to work with. It is poorly documented, the API exposes low level concepts of the Kafka protocol, and it doesn't support recent Go features like contexts. It also passes all values as pointers which causes large numbers of dynamic memory allocations, more frequent garbage collections, and higher memory usage.

  • confluent-kafka-go is a cgo based wrapper around librdkafka, which means it introduces a dependency to a C library on all Go code that uses the package. It has much better documentation than sarama but still lacks support for Go contexts.

  • goka is a more recent Kafka client for Go which focuses on a specific usage pattern. It provides abstractions for using Kafka as a message passing bus between services rather than an ordered log of events, but this is not the typical use case of Kafka for us at Segment. The package also depends on sarama for all interactions with Kafka.

This is where comes into play. It provides both low and high level APIs for interacting with Kafka, mirroring concepts and implementing interfaces of the Go standard library to make it easy to use and integrate with existing software.

Note:

In order to better align with our newly adopted Code of Contact, the kafka-go project has renamed our default branch to .
For the full details of our Code Of Conduct see this document.

Migrating to

Version introduces a few breaking changes to the repository structure which should have minimal impact on programs and should only manifest at compile time (the runtime behavior should remain unchanged).

  • Programs do not need to import compression packages anymore in order to read compressed messages from kafka. All compression codecs are supported by default.

  • Programs that used the compression codecs directly must be adapted. Compression codecs are now exposed in the sub-package.

  • The experimental API has been updated and slightly modified: the function and type were removed. Programs now configure the client values directly through exported fields.

  • The method is now deprecated (along with the type, and will be removed when we release version Programs should use the API instead.

With , we know that we are starting to introduce a bit more complexity in the code, but the plan is to eventually converge towards a simpler and more effective API, allowing us to keep up with Kafka's ever growing feature set, and bringing a more efficient implementation to programs depending on kafka-go.

We truly appreciate everyone's input and contributions, which have made this project way more than what it was when we started it, and we're looking forward to receive more feedback on where we should take it.

Kafka versions

is currently compatible with Kafka versions from to While latest versions will be working, some features available from the Kafka API may not be implemented yet.

Golang version

is currently compatible with golang version from +. To use with older versions of golang use release v

Connection GoDoc

The type is the core of the package. It wraps around a raw network connection to expose a low-level API to a Kafka server.

Here are some examples showing typical use of a connection object:

To Create Topics

By default kafka has the ( in the wurstmeister/kafka kafka docker image). If this value is set to then topics will be created as a side effect of like so:

If then you will need to create topics explicitly like so:

To Connect To Leader Via a Non-leader Connection
To list topics

Because it is low level, the type turns out to be a great building block for higher level abstractions, like the for example.

Reader GoDoc

A is another concept exposed by the package, which intends to make it simpler to implement the typical use case of consuming from a single topic-partition pair. A also automatically handles reconnections and offset management, and exposes an API that supports asynchronous cancellations and timeouts using Go contexts.

Note that it is important to call on a when a process exits. The kafka server needs a graceful disconnect to stop it from continuing to attempt to send messages to the connected clients. The given example will not call if the process is terminated with SIGINT (ctrl-c at the shell) or SIGTERM (as docker stop or a kubernetes restart does). This can result in a delay when a new reader on the same topic connects (e.g. new process started or new container running). Use a handler to close the reader on process shutdown.

Consumer Groups

also supports Kafka consumer groups including broker managed offsets. To enable consumer groups, simply specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

There are a number of limitations when using consumer groups:

  • will return an error when GroupID is set
  • will always return when GroupID is set
  • will always return when GroupID is set
  • will return an error when GroupID is set
  • will return a partition of when GroupID is set
Explicit Commits

also supports explicit commits. Instead of calling , call followed by .

Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. For improved performance, you can instead periodically commit offsets to Kafka by setting CommitInterval on the ReaderConfig.

Writer GoDoc

To produce messages to Kafka, a program may use the low-level API, but the package also provides a higher level type which is more appropriate to use in most cases as it provides additional features:

  • Automatic retries and reconnections on errors.
  • Configurable distribution of messages across available partitions.
  • Synchronous or asynchronous writes of messages to Kafka.
  • Asynchronous cancellation using contexts.
  • Flushing of pending messages on close to support graceful shutdowns.
Writing to multiple topics

Normally, the is used to initialize a single-topic writer. By excluding that particular configuration, you are given the ability to define the topic on a per-message basis by setting .

NOTE: These 2 patterns are mutually exclusive, if you set , you must not also explicitly define on the messages you are writing. The opposite applies when you do not define a topic for the writer. The will return an error if it detects this ambiguity.

Compatibility with other clients
Sarama

If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can use the balancer. routes messages to the same partitions that Sarama's default partitioner would route to.

librdkafka and confluent-kafka-go

Use the balancer to get the same behaviour as librdkafka's default partition strategy.

Java

Use the balancer to get the same behaviour as the canonical Java client's default partitioner. Note: the Java class allows you to directly specify the partition which is not permitted.

Compression

Compression can be enabled on the by setting the field:

The will by determine if the consumed messages are compressed by examining the message attributes. However, the package(s) for all expected codecs must be imported so that they get loaded correctly.

Note: in versions prior to programs had to import compression packages to install codecs and support reading compressed messages from kafka. This is no longer the case and import of the compression packages are now no-ops.

TLS Support

For a bare bones Conn type or in the Reader/Writer configs you can specify a dialer option for TLS support. If the TLS field is nil, it will not connect with TLS.

Connection
Reader
Writer

SASL Support

You can specify an option on the to use SASL authentication. The can be used directly to open a or it can be passed to a or via their respective configs. If the field is , it will not authenticate with SASL.

SASL Authentication Types
Plain
SCRAM
Connection
Reader
Writer
Reading all messages within a time range

Testing

Subtle behavior changes in later Kafka versions have caused some historical tests to break, if you are running against Kafka or later, exporting the environment variables will skip those tests.

Run Kafka locally in docker

Run tests

Sours: https://pkg.go.dev/github.com/segmentio/kafka-go
Golang Live I How to develop Kafka based Go services in 2020 with Dino Omanovic

Package kafka provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library.

High-level Consumer ¶

* Decide if you want to read messages and events by calling `.Poll()` or the deprecated option of using the `.Events()` channel. (If you want to use `.Events()` channel then set `"go.events.channel.enable": true`).

* Create a Consumer with `kafka.NewConsumer()` providing at least the `bootstrap.servers` and `group.id` configuration properties.

* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) to join the group with the specified subscription set. Subscriptions are atomic, calling `.Subscribe*()` again will leave the group and rejoin with the new set of topics.

* Start reading events and messages from either the `.Events` channel or by calling `.Poll()`.

* When the group has rebalanced each client member is assigned a (sub-)set of topic+partitions. By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass `"go.application.rebalance.enable": true` to the `NewConsumer()` call mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to `"auto.offset.reset"` which defaults to the `latest` message) and then call `.Assign(partitions)` to start consuming. If you don't need to modify the initial offsets you will not need to call `.Assign()`, the client will do so automatically for you if you dont, unless you are using the channel-based consumer in which case you MUST call `.Assign()` when receiving the `AssignedPartitions` and `RevokedPartitions` events.

* As messages are fetched they will be made available on either the `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.

* Handle messages, events and errors to your liking.

* When you are done consuming call `.Close()` to commit final offsets and leave the consumer group.

Producer ¶

* Create a Producer with `kafka.NewProducer()` providing at least the `bootstrap.servers` configuration properties.

* Messages may now be produced either by sending a `*kafka.Message` on the `.ProduceChannel` or by calling `.Produce()`.

* Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` and you should check `msg.TopicPartition.Error` for `nil` to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil `chan Event` channel to `.Produce()`. If no delivery reports are wanted they can be completely disabled by setting configuration property `"go.delivery.reports": false`.

* When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function `.Flush()` that will block until all message deliveries are done or the provided timeout elapses.

* Finally call `.Close()` to decommission the producer.

Transactional producer API ¶

The transactional producer operates on top of the idempotent producer, and provides full exactly-once semantics (EOS) for Apache Kafka when used with the transaction aware consumer (`isolation.level=read_committed`).

A producer instance is configured for transactions by setting the `transactional.id` to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.

After creating the transactional producer instance using `NewProducer()` the transactional state must be initialized by calling `InitTransactions()`. This is a blocking call that will acquire a runtime producer id from the transaction coordinator broker as well as abort any stale transactions and fence any still running producer instances with the same `transactional.id`.

Once transactions are initialized the application may begin a new transaction by calling `BeginTransaction()`. A producer instance may only have one single on-going transaction.

Any messages produced after the transaction has been started will belong to the ongoing transaction and will be committed or aborted atomically. It is not permitted to produce messages outside a transaction boundary, e.g., before `BeginTransaction()` or after `CommitTransaction()`, `AbortTransaction()` or if the current transaction has failed.

If consumed messages are used as input to the transaction, the consumer instance must be configured with `enable.auto.commit` set to `false`. To commit the consumed offsets along with the transaction pass the list of consumed partitions and the last offset processed + 1 to `SendOffsetsToTransaction()` prior to committing the transaction. This allows an aborted transaction to be restarted using the previously committed offsets.

To commit the produced messages, and any consumed offsets, to the current transaction, call `CommitTransaction()`. This call will block until the transaction has been fully committed or failed (typically due to fencing by a newer producer instance).

Alternatively, if processing fails, or an abortable transaction error is raised, the transaction needs to be aborted by calling `AbortTransaction()` which marks any produced messages and offset commits as aborted.

After the current transaction has been committed or aborted a new transaction may be started by calling `BeginTransaction()` again.

Retriable errors: Some error cases allow the attempted operation to be retried, this is indicated by the error object having the retriable flag set which can be detected by calling `err.(kafka.Error).IsRetriable()`. When this flag is set the application may retry the operation immediately or preferably after a shorter grace period (to avoid busy-looping). Retriable errors include timeouts, broker transport failures, etc.

Abortable errors: An ongoing transaction may fail permanently due to various errors, such as transaction coordinator becoming unavailable, write failures to the Apache Kafka log, under-replicated partitions, etc. At this point the producer application must abort the current transaction using `AbortTransaction()` and optionally start a new transaction by calling `BeginTransaction()`. Whether an error is abortable or not is detected by calling `err.(kafka.Error).TxnRequiresAbort()` on the returned error object.

Fatal errors: While the underlying idempotent producer will typically only raise fatal errors for unrecoverable cluster errors where the idempotency guarantees can't be maintained, most of these are treated as abortable by the transactional producer since transactions may be aborted and retried in their entirety; The transactional producer on the other hand introduces a set of additional fatal errors which the application needs to handle by shutting down the producer and terminate. There is no way for a producer instance to recover from fatal errors. Whether an error is fatal or not is detected by calling `err.(kafka.Error).IsFatal()` on the returned error object or by checking the global `GetFatalError()`.

Handling of other errors: For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.

Error handling example:

retry: err := producer.CommitTransaction() if err == nil { return nil } else if err.(kafka.Error).TxnRequiresAbort() { do_abort_transaction_and_reset_inputs() } else if err.(kafka.Error).IsRetriable() { goto retry } else { // treat all other errors as fatal errors panic(err) }

Events ¶

Apart from emitting messages and delivery reports the client also communicates with the application through a number of different event types. An application may choose to handle or ignore these events.

Consumer events ¶

* `*kafka.Message` - a fetched message.

* `AssignedPartitions` - The assigned partition set for this client following a rebalance. Requires `go.application.rebalance.enable`

* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. `AssignedPartitions` and `RevokedPartitions` are symmetrical. Requires `go.application.rebalance.enable`

* `PartitionEOF` - Consumer has reached the end of a partition. NOTE: The consumer will keep trying to fetch new messages for the partition.

* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).

Producer events ¶

* `*kafka.Message` - delivery report for produced message. Check `.TopicPartition.Error` for delivery result.

Generic events for both Consumer and Producer ¶

* `KafkaError` - client (error codes are prefixed with _) or broker error. These errors are normally just informational since the client will try its best to automatically recover (eventually).

* `OAuthBearerTokenRefresh` - retrieval of a new SASL/OAUTHBEARER token is required. This event only occurs with sasl.mechanism=OAUTHBEARER. Be sure to invoke SetOAuthBearerToken() on the Producer/Consumer/AdminClient instance when a successful token retrieval is completed, otherwise be sure to invoke SetOAuthBearerTokenFailure() to indicate that retrieval failed (or if setting the token failed, which could happen if an extension doesn't meet the required regular expression); invoking SetOAuthBearerTokenFailure() will schedule a new event for 10 seconds later so another retrieval can be attempted.

Hint: If your application registers a signal notification (signal.Notify) makes sure the signals channel is buffered to avoid possible complications with blocking Poll() calls.

Note: The Confluent Kafka Go client is safe for concurrent use.

  • Constants
  • func LibraryVersion() (int, string)
  • func WriteErrorCodes(f *os.File)
  • type AdminClient
    • func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, ) (result []ConfigResourceResult, err error)
    • func (a *AdminClient) Close()
    • func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
    • func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
    • func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, ) (result []TopicResult, err error)
    • func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, ) (result []TopicResult, err error)
    • func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options DeleteTopicsAdminOption) (result []TopicResult, err error)
    • func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, ) (result []ConfigResourceResult, err error)
    • func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
    • func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
    • func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
    • func (a *AdminClient) String() string
  • type AdminOption
  • type AdminOptionOperationTimeout
  • type AdminOptionRequestTimeout
  • type AdminOptionValidateOnly
  • type AlterConfigsAdminOption
  • type AlterOperation
  • type AssignedPartitions
  • type BrokerMetadata
  • type ConfigEntry
  • type ConfigEntryResult
  • type ConfigMap
  • type ConfigResource
  • type ConfigResourceResult
  • type ConfigSource
  • type ConfigValue
  • type Consumer
    • func (c *Consumer) Assign(partitions []TopicPartition) (err error)
    • func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
    • func (c *Consumer) AssignmentLost() bool
    • func (c *Consumer) Close() (err error)
    • func (c *Consumer) Commit() ([]TopicPartition, error)
    • func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
    • func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
    • func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
    • func (c *Consumer) Events() chan Event
    • func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
    • func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
    • func (c *Consumer) GetRebalanceProtocol() string
    • func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
    • func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
    • func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
    • func (c *Consumer) Logs() chan LogEvent
    • func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
    • func (c *Consumer) Pause(partitions []TopicPartition) (err error)
    • func (c *Consumer) Poll(timeoutMs int) (event Event)
    • func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
    • func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
    • func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
    • func (c *Consumer) Resume(partitions []TopicPartition) (err error)
    • func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error
    • func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
    • func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
    • func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
    • func (c *Consumer) String() string
    • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
    • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
    • func (c *Consumer) Subscription() (topics []string, err error)
    • func (c *Consumer) Unassign() (err error)
    • func (c *Consumer) Unsubscribe() (err error)
  • type ConsumerGroupMetadata
  • type CreatePartitionsAdminOption
  • type CreateTopicsAdminOption
  • type DeleteTopicsAdminOption
  • type DescribeConfigsAdminOption
  • type Error
  • type ErrorCode
  • type Event
  • type Handle
  • type Header
  • type LogEvent
  • type Message
  • type Metadata
  • type OAuthBearerToken
  • type OAuthBearerTokenRefresh
  • type Offset
  • type OffsetsCommitted
  • type PartitionEOF
  • type PartitionMetadata
  • type PartitionsSpecification
  • type Producer
    • func (p *Producer) AbortTransaction(ctx context.Context) error
    • func (p *Producer) BeginTransaction() error
    • func (p *Producer) Close()
    • func (p *Producer) CommitTransaction(ctx context.Context) error
    • func (p *Producer) Events() chan Event
    • func (p *Producer) Flush(timeoutMs int) int
    • func (p *Producer) GetFatalError() error
    • func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
    • func (p *Producer) InitTransactions(ctx context.Context) error
    • func (p *Producer) Len() int
    • func (p *Producer) Logs() chan LogEvent
    • func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
    • func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
    • func (p *Producer) ProduceChannel() chan *Message
    • func (p *Producer) Purge(flags int) error
    • func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
    • func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, ) error
    • func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
    • func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error
    • func (p *Producer) String() string
    • func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode
  • type RebalanceCb
  • type ResourceType
  • type RevokedPartitions
  • type Stats
  • type TimestampType
  • type TopicMetadata
  • type TopicPartition
  • type TopicPartitions
  • type TopicResult
  • type TopicSpecification
View Sourceconst LibrdkafkaLinkInfo = "static glibc_linux from librdkafka-static-bundle-vtgz"

LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client

OffsetBeginning represents the earliest offset (logical)

OffsetEnd represents the latest offset (logical)

OffsetInvalid represents an invalid/unspecified offset

OffsetStored represents a stored offset

PartitionAny represents any partition (for partitioning), or unspecified value (for all other cases)

This section is empty.

LibraryVersion returns the underlying librdkafka library version as a (version_int, version_str) tuple.

func WriteErrorCodes(f *os.File)

WriteErrorCodes writes Go error code constants to file from the librdkafka error codes. This function is not intended for public use.

type AdminClient struct { }

AdminClient is derived from an existing Producer or Consumer

NewAdminClient creats a new AdminClient instance with a new underlying client instance

NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. The AdminClient will use the same configuration and connections as the parent instance.

NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. The AdminClient will use the same configuration and connections as the parent instance.

AlterConfigs alters/updates cluster resource configuration.

Updates are not transactional so they may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is updated atomically, replacing values using the provided ConfigEntrys and reverting unspecified ConfigEntrys to their default values.

Requires broker version >=

AlterConfigs will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration to their default values.

Multiple resources and resource types may be set, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.

Close an AdminClient instance.

ClusterID returns the cluster ID as reported in broker metadata.

Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.

Requires broker version >=

ControllerID returns the broker ID of the current controller as reported in broker metadata.

Note on cancellation: Although the underlying C function respects the timeout, it currently cannot be manually cancelled. That means manually cancelling the context will block until the C function call returns.

Requires broker version >=

CreateTopics creates topics in cluster.

The list of TopicSpecification objects define the per-topic partition count, replicas, etc.

Topic creation is non-atomic and may succeed for some topics but fail for others, make sure to check the result for topic-specific errors.

Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.

DeleteTopics deletes a batch of topics.

This operation is not transactional and may succeed for a subset of topics while failing others. It may take several seconds after the DeleteTopics result returns success for all the brokers to become aware that the topics are gone. During this time, topic metadata and configuration may continue to return information about deleted topics.

Requires broker version >=

DescribeConfigs retrieves configuration for cluster resources.

The returned configuration includes default values, use ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish default values from manually configured settings.

The value of config entries where .IsSensitive is true will always be nil to avoid disclosing sensitive information, such as security settings.

Configuration entries where .IsReadOnly is true can't be modified (with AlterConfigs).

Synonym configuration entries are returned if the broker supports it (broker version >= ). See .Synonyms.

Requires broker version >=

Multiple resources and resource types may be requested, but at most one resource of type ResourceBroker is allowed per call since these resource requests must be sent to the broker specified in the resource.

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.

SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per https://tools.ietf.org/html/rfc#section); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism.

String returns a human readable name for an AdminClient instance

type AdminOption interface { }

AdminOption is a generic type not to be used directly.

See CreateTopicsAdminOption et.al.

type AdminOptionOperationTimeout struct { }

AdminOptionOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.

CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.

Default: 0 (return immediately).

Valid for CreateTopics, DeleteTopics, CreatePartitions.

SetAdminOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller before returning a result to the application.

CreateTopics, DeleteTopics, CreatePartitions: a value 0 will return immediately after triggering topic creation, while > 0 will wait this long for topic creation to propagate in cluster.

Default: 0 (return immediately).

Valid for CreateTopics, DeleteTopics, CreatePartitions.

type AdminOptionRequestTimeout struct { }

AdminOptionRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.

Default: `socket.timeout.ms`.

Valid for all Admin API methods.

SetAdminRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response.

Default: `socket.timeout.ms`.

Valid for all Admin API methods.

type AdminOptionValidateOnly struct { }

AdminOptionValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).

Default: false.

Valid for CreateTopics, CreatePartitions, AlterConfigs

SetAdminValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc).

Default: false.

Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs

type AlterConfigsAdminOption interface { }

AlterConfigsAdminOption - see setters.

See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.

AlterOperation specifies the operation to perform on the ConfigEntry. Currently only AlterOperationSet.

String returns the human-readable representation of an AlterOperation

BrokerMetadata contains per-broker metadata

ConfigEntry holds parameters for altering a resource's configuration.

StringMapToConfigEntries creates a new map of ConfigEntry objects from the provided string map. The AlterOperation is set on each created entry.

String returns a human-readable representation of a ConfigEntry.

ConfigMap is a map containing standard librdkafka configuration properties as documented in: https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md

The special property "default.topic.config" (optional) is a ConfigMap containing default topic configuration properties.

The use of "default.topic.config" is deprecated, topic configuration properties shall be specified in the standard ConfigMap. For backwards compatibility, "default.topic.config" (if supplied) takes precedence.

Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. If the key is found but the type does not match that of `defval` (unless nil) an ErrInvalidArg error is returned.

Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.

SetKey sets configuration property key to value.

For user convenience a key prefixed with {topic}. will be set on the "default.topic.config" sub-map, this use is deprecated.

ConfigResource holds parameters for altering an Apache Kafka configuration resource

String returns a human-readable representation of a ConfigResource

ConfigSource represents an Apache Kafka config source

String returns the human-readable representation of a ConfigSource type

type ConfigValue interface{}

ConfigValue supports the following types:

bool, int, string, any type with the standard String() interface

Consumer implements a High-level Apache Kafka Consumer instance

NewConsumer creates a new high-level Consumer instance.

conf is a *ConfigMap with standard librdkafka configuration properties.

Supported special configuration properties:

go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. If set to true the app must handle the AssignedPartitions and RevokedPartitions events and call Assign() and Unassign() respectively. go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. go.events.channel.size (int, ) - Events() channel size go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.

WARNING: Due to the buffering nature of channels (and queues in general) the use of the events channel risks receiving outdated events and messages. Minimizing go.events.channel.size reduces the risk and number of outdated events and messages but does not eliminate the factor completely. With a channel size of 1 at most one event or message may be outdated.

Assign an atomic set of partitions to consume.

The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.

This replaces the current assignment.

AssignmentLost returns true if current partition assignment has been lost. This method is only applicable for use with a subscribing consumer when handling a rebalance event or callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.

Close Consumer instance. The object is no longer usable after this call.

Commit offsets for currently assigned partitions This is a blocking call. Returns the committed offsets on success.

CommitMessage commits offset based on the provided message. This is a blocking call. Returns the committed offsets on success.

Events returns the Events channel (if enabled)

GetConsumerGroupMetadata returns the consumer's current group metadata. This object should be passed to the transactional producer's SendOffsetsToTransaction() API.

GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.

GetRebalanceProtocol returns the current consumer group rebalance protocol, which is either "EAGER" or "COOPERATIVE". If the rebalance protocol is not known in the current state an empty string is returned. Should typically only be called during rebalancing.

GetWatermarkOffsets returns the cached low and high offsets for the given topic and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. The low offset is populated every statistics.interval.ms if that value is set. OffsetInvalid will be returned if there is no cached offset for either value.

IncrementalAssign adds the specified partitions to the current set of partitions to consume.

The .Offset field of each TopicPartition must either be set to an absolute starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), but should typically be set to `kafka.OffsetStored` to have the consumer use the committed offset as a start position, with a fallback to `auto.offset.reset` if there is no committed offset.

The new partitions must not be part of the current assignment.

IncrementalUnassign removes the specified partitions from the current set of partitions to consume.

The .Offset field of the TopicPartition is ignored.

The removed partitions must be part of the current assignment.

Logs returns the log channel if enabled, or nil otherwise.

OffsetsForTimes looks up offsets by timestamp for the given partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.

The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list.

The function will block for at most timeoutMs milliseconds.

Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field.

Sours: https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka

Go client kafka

Kafka Go Client¶

Confluent develops and maintains a Go client for Apache Kafka® that offers a producer and a consumer.

Go Client installation¶

The Go client, called confluent-kafka-go, is distributed via GitHub and gopkg.in to pin to specific versions. The Go client uses librdkafka, the C client, internally and exposes it as Go library using cgo. Starting with confluent-kafka-go v, the librdkafka client is now included in the Go client and no separate installation of librdkafka is required for the supported platforms (Linux (glibc and musl based), and Mac OSX).

For other platforms the following instructions still apply: To install the Go client, first install the C client including its development package as well as a C build toolchain including . On Red Hat-based Linux distributions install the following packages in addition to librdkafka:

sudo yum groupinstall "Development Tools"

On Debian-based distributions, install the following in addition to librdkafka:

sudo apt-get install build-essential pkg-config git

On macOS using Homebrew, install the following:

brew install pkg-config git

Next, use to install the library:

go get gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Your Go code can now import and use the client. You can also build and run a small command line utility, , to ensure the installation was successful:

go get gopkg.in/confluentinc/confluent-kafka-go.v1/examples/go-kafkacat $GOPATH/bin/go-kafkacat --help

See the clients documentation for code examples showing how to use the library.

The source code is also available in the ZIP and TAR archives under the directory .

Go Client example code¶

For Hello World examples of Kafka clients in Go, see Go. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data.

Kafka Producer¶

Initialization¶

The Go client uses a object to pass configuration to the producer:

import ("github.com/confluentinc/confluent-kafka-go/kafka") p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "host,host", "client.id": socket.gethostname(), "acks": "all"})if err != nil { fmt.Printf("Failed to create producer: %s\n", err) os.Exit(1)}

Asynchronous writes¶

In Go, you initiate a send by calling the method, passing a object and an optional that can be used to listen for the result of the send. The object contains an opaque field that can be used to pass arbitrary data with the message to the subsequent event handler:

delivery_chan := make(chan kafka.Event, )err= p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, delivery_chan, )

To produce asynchronously, you can use a Goroutine to handle message delivery reports and possibly other event types (errors, stats, etc) concurrently:

go func(){for e := range p.Events(){ switch ev := e.(type){case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)}else{ fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)}}}}()

Synchronous writes¶

Making writes synchronous is typically a bad idea since it decreases throughput. Nevertheless, you can make writes synchronous by receiving from the delivery channel passed to the method call as shown below:

delivery_chan := make(chan kafka.Event, )err= p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, delivery_chan ) e := <-delivery_chan m := e.(*kafka.Message)if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)}else{ fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)} close(delivery_chan)

Alternatively, you can wait for all messages to be acknowledged and use the method:

Important

will only serve the producer’s channel, not application-specified delivery channels. If is called and no goroutine is processing the delivery channel, its buffer may fill up and cause the flush to timeout.

Kafka Consumer¶

Initialization¶

The Go client uses a object to pass configuration to the consumer:

import("github.com/confluentinc/confluent-kafka-go/kafka")consumer,err:=kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":"host,host","group.id":"foo","auto.offset.reset":"smallest"})

Basic usage¶

The Go client uses librdkafka internally so it follows a multi-threaded approach to Kafka consumption. The API returns only a single message or event at a time:

forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){case*kafka.Message:// application-specific processingcasekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}

Go Client code examples¶

Basic poll loop¶

The consumer API is centered around the method, which is used to retrieve records from the brokers. The method controls which topics will be fetched in poll. This consumer example shows typical usage,which involves an initial call to to setup the topics of interest and then a loop which calls until the application is shut down.

err=consumer.SubscribeTopics(topics,nil)forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){case*kafka.Message:fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition,string(e.Value))casekafka.PartitionEOF:fmt.Printf("%% Reached %v\n",e)casekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}consumer.Close()

If no records are received, then will return an empty record set.

Note that you should always call after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. Latch is added to this example to ensure that the consumer has time to finish closing before finishing shut down.

Synchronous commits¶

The Go client provides a synchronous method call. Other variants of commit methods also accept a list of offsets to commit or a in order to commit offsets relative to a consumed message. When using manual offset commit, be sure to disable the configuration.

msg_count:=0forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){case*kafka.Message:msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0{consumer.Commit()}fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition,string(e.Value))casekafka.PartitionEOF:fmt.Printf("%% Reached %v\n",e)casekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}

In this example, a synchronous commit is triggered every messages. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees¶

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){case*kafka.Message:err=consumer.CommitMessage(e)iferr==nil{msg_process(e)}casekafka.PartitionEOF:fmt.Printf("%% Reached %v\n",e)casekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}

For simplicity in this example, is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous commits¶

To commit asynchronously, simply execute the commit in a goroutine to commit asynchronously:

msg_count:=0forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){case*kafka.Message:msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0{gofunc(){offsets,err:=consumer.Commit()}()}fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition,string(e.Value))casekafka.PartitionEOF:fmt.Printf("%% Reached %v\n",e)casekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}

Rebalance events are exposed as events returned by the method. To see these events you must create the consumer with the configuration and handle and events by explicitly calling and for and respectively:

consumer,err:=kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":"host,host","group.id":"foo","go.application.rebalance.enable":true})msg_count:=0forrun==true{ev:=consumer.Poll(0)switche:=ev.(type){casekafka.AssignedPartitions:fmt.Fprintf(os.Stderr,"%% %v\n",e)c.Assign(e.Partitions)casekafka.RevokedPartitions:fmt.Fprintf(os.Stderr,"%% %v\n",e)c.Unassign()case*kafka.Message:msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0{consumer.Commit()}fmt.Printf("%% Message on %s:\n%s\n",e.TopicPartition,string(e.Value))casekafka.PartitionEOF:fmt.Printf("%% Reached %v\n",e)casekafka.Error:fmt.Fprintf(os.Stderr,"%% Error: %v\n",e)run=falsedefault:fmt.Printf("Ignored %v\n",e)}}

API documentation¶

Click here to view the Go Client API documentation.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html
Using Apache Kafka with Go

.

Now discussing:

.



2203 2204 2205 2206 2207