Configuring Message Accumulation Monitoring
Unprocessed messages accumulate if the client's consumption is slower than the server's sending. When accumulated messages cannot be consumed in time, we can configure alarm rules so that you will be notified when the number of accumulated messages in a consumer group exceeds the threshold.
Configuring Simple Message Notification​
Creating a Topic​
- Log in to the Open Telekom Cloud Console -> Simple Message Notification -> Topics and click Create Topic.
- Give it a name, e.g. test_topic and click OK.
Creating a Subscription​
- Go to the Open Telekom Cloud Console -> Simple Message Notification -> Subscriptions and click Add Subscription.
Figure 1 Add a Subscription to the Topic
- Click Select Topic and choose the topic we just created.
- Set Protocol to
Email
. - Add an Endpoint (an email address you have direct access to) and click OK.
Back to the Subscriptions console click Request Confirmation:
Figure 2 Request Subscription Confirmation
and click Confirm Subscription in the "SMN-Confirming Your Subcription" email:
Figure 3 Respond to the Subscription Confirmation Request Email
Configuring Cloud Eye​
- Log in to the Open Telekom Cloud Console -> Cloud Eye -> Distributed Message Service -> Kafka Platinum.
- Find your Kafka instance in the list and click Create Alarm Rule.
Figure 4 Create a Cloud Eye Alarm Rule
Creating an Alarm Rule​
- Set Template to
DMS Kafka Instance Alarm Template
. - Set Notification Object to the name of the SMN topic we created in the previous step (in this case test_topic) and click Create.
Figure 5 Configure the Cloud Eye Alarm Rule
Run a simulation​
We are going to rely in a small Golang package that creates a DMS Kafka topic, a message producer, a bunch of message consumers in order to simulate accumulated unprocessed messages in our Kafka instance.
You can find more information about the implementation details of this package in the Best Practice: Optimizing Consumer Polling.
Cloning the repository​
Clone the repo in your local development machine:
git clone https://github.com/opentelekomcloud-blueprints/kafka-optimizing-consumer-pulling.git
Configuring the Parameters​
For this lab, we want to give a handicap to the consumers so we can simulate an accumulation of unprocessed messages. For that matter, open main.go in the editor of your choice and make the following changes:
- Set
messages
to1000
-> Produces a significant amount of messages to facilitate our observation.
var (
consumers = 5
partitions = 3
messages = 1000
logLevel = slog.LevelInfo
cleanExit = true
)
- Raise
MaxWaitTime
to1000
milliseconds -> For more details howMaxWaitTime
affects the consumers consult Optimizing Consumer Polling-Use Long Polling.
func newConsumer(ctx context.Context, consumerId int, wg *sync.WaitGroup) {
defer wg.Done()
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.MaxWaitTime = 1000 * time.Millisecond
- Start the simulation and go to Open Telekom Cloud Console -> Distributed Message Service -> Monitoring -> Monitoring Details and you will notice that the consumers they start lagging behind and unprocessed messages begin to accumulate:
Figure 6 Kafka Instance Monitoring Details
Soon enough, when the limit set in the alarm rule is met, you will receive an email from SMN that will inform you that a Major Alarm is triggered by Cloud Eye concerning the Kafka instance:
Figure 7 Major Alarm is triggered
- Let the simulation continue, and as the time passes the consumers are starting catching up you will be informed by a second email, that Cloud Eye delegated the situation and the Major Alarm is now been suppressed:
Figure 8 Major Alarm is over
You can set up alerts for a variety of Kafka metrics beyond Accumulated Messages. In addition to targeting the Kafka instance itself, you have the option to focus on individual brokers, specific topics, or distinct consumer groups. This flexibility allows for more granular monitoring and quicker responses to potential issues within your DMS/Kafka environment.