Migrating Kafka Services
This solution will demonstrate how you can migrate Kafka services to connect message producers and consumers to a new Kafka instance and can even migrate persisted message data to a new Kafka instance.
Kafka services can be migrated in the following two scenarios:
-
Migrating services to the cloud without downtime
Services that have high requirements on continuity must be smoothly migrated to the cloud because they cannot afford a long downtime.
-
Re-deploying services in the cloud
A Kafka instance deployed within an AZ is not capable of cross-AZ disaster recovery. For higher reliability, you can re-deploy services to an instance that is deployed across AZs.
Prerequisites​
-
Configure the network environment.
A Kafka instance can be accessed within a VPC or over a public network. For public network access, the producer and consumer must have public access permissions, and the following security group rules must be configured:
- Inbound:
TCP/9094
,0.0.0.0/0
-> Access Kafka through the public network (without SSL encryption) - Inbound:
TCP/9095
,0.0.0.0/0
-> Access Kafka through the public network (with SSL encryption)
- Inbound:
-
Create a Kafka instance.
The specifications of the new instance cannot be lower than the original specifications. For details, see Creating an Instance.
-
Create a topic.
Create a topic with the same configurations as the original Kafka instance, including the topic name, number of replicas, number of partitions, message aging time, and whether to enable synchronous replication and flushing. For details, see Creating a Topic.
Migration Scheme 1: Migrating the Production First​
Migrate the message production service to the new Kafka instance. After migration, the original Kafka instance will no longer produce messages. After all messages of the original Kafka instance are consumed, migrate the message consumption service to the new Kafka instance to consume messages of this instance.
- Change the Kafka connection address of the producer to that of the new Kafka instance.
- Restart the production service so that the producer can send new messages to the new Kafka instance.
- Check the consumption progress of each consumer group in the original Kafka instance until all data in the original Kafka instance is consumed.
- Change the Kafka connection addresses of the consumers to that of the new Kafka instance.
- Restart the consumption service so that consumers can consume messages from the new Kafka instance.
- Check whether consumers consume messages properly from the new Kafka instance.
- The migration is completed.
This is a common migration scheme. It is simple and easy to control on the service side. During the migration, the message sequence is ensured, so this scheme is suitable for scenarios with strict requirements on the message sequence. However, latency may occur because there is a period when you have to wait for all data to be consumed.
Migration Scheme 2: Migrating the Production Later​
Use multiple consumers for the consumption service. Some consume messages from the original Kafka instance, and others consume messages from the new Kafka instances. Then, migrate the production service to the new Kafka instance so that all messages can be consumed in time.
-
Start new consumer clients, set the Kafka connection addresses to that of the new Kafka instance, and consume data from the new Kafka instance.
noteOriginal consumer clients must continue running. Messages are consumed from both the original and new Kafka instances.
-
Change the Kafka connection address of the producer to that of the new Kafka instance.
-
Restart the producer client to migrate the production service to the new Kafka instance.
-
After the production service is migrated, check whether the consumption service connected to the new Kafka instance is normal.
-
After all data in the original Kafka is consumed, close the original consumption clients.
-
The migration is completed.
In this scheme, the migration process is controlled by services. For a certain period of time, the consumption service consumes messages from both the original and new Kafka instances. Before the migration, message consumption from the new Kafka instance has already started, so there is no latency. However, early on in the migration, data is consumed from both the original and new Kafka instances, so the messages may not be consumed in the order that they are produced. This scheme is suitable for services that require low latency but do not require strict message sequence.
How Do I Migrate Persisted Data Along with Services?​
You can migrate consumed data from the original instance to a new instance by using the open-source tool MirrorMaker. This tool mirrors the original Kafka producer and consumer into new ones and migrates data to the new Kafka instance. For details, see Using MirrorMaker to Synchronize Data Across Clusters.
Each Open Telekom Cloud Kafka instance stores data in three replicas. Therefore, the storage space of the new instance should be three times that of the original single-replica message storage.
Using MirrorMaker to Synchronize Data Across Clusters​
In the following scenarios, MirrorMaker can be used to synchronize data between different Kafka clusters to ensure the availability and reliability of the clusters:
- Backup and disaster recovery: An enterprise has multiple data centers. To prevent service unavailability caused by a fault in one data center, cluster data is synchronously backed up in multiple data centers.
- Cluster migration: As enterprises migrate services to the cloud, data in on-premises clusters must be synchronized with that in cloud clusters to ensure service continuity.
Solution Design​
MirrorMaker can be used to mirror data from the source cluster to the target cluster. As shown in figure below, in essence, MirrorMaker first consumes data from the source cluster and then produces the consumed data to the target cluster. For more information about MirrorMaker, see Mirroring data between clusters.
- The IP addresses and port numbers of the nodes in the source cluster cannot be the same as those of the nodes in the target cluster. Otherwise, data will be replicated infinitely in a topic.
- Use MirrorMaker to synchronize data between at least two clusters. If there is only one cluster, data will be replicated infinitely in a topic.
Installing and Configuring MirrorMaker​
-
Create an ECS that can communicate with the source and target clusters. For details, see the ECS documentation.
-
Log in to the ECS, install JDK, and add the following contents to .bash_profile in the home directory to configure the environment variables
JAVA\_HOME
andPATH
. In this command, /opt/java/jdk1.8.0_151 is the JDK installation path. Change it to the path where you install JDK.export JAVA_HOME=/opt/java/jdk1.8.0_151
export PATH=$JAVA_HOME/bin:$PATHRun the
source .bash\_profile
command for the modification to take effect.noteUse Oracle JDK instead of ECS's default JDK (for example, OpenJDK), because ECS's default JDK may not be suitable. Obtain Oracle JDK 1.8.111 or later from Oracle's official website.
-
Download the binary software package of Kafka 3.3.1.
wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
-
Decompress the binary software package.
tar -zxvf kafka_2.12-3.3.1.tgz
-
Go to the binary software package directory and specify the IP addresses and ports of the source and target clusters and other parameters in the connect-mirror-maker.properties configuration file in the config directory.
connect-mirror-maker.properties# Specify two clusters.
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:A_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port
# Specify the data synchronization direction. The data can be synchronized unidirectionally or bidirectionally.
A->B.enabled = true
# Specify the topics to be synchronized. Regular expressions are supported. By default, all topics are replicated, for example, foo-.*.
A->B.topics = .*
# If the following two configurations are enabled, clusters A and B replicate data with each other.
#B->A.enabled = true
#B->A.topics = .*
# Specify the number of replicas. If multiple topics need to be synchronized and their replica quantities are different, create topics with the same name and replica quantity before starting MirrorMaker.
replication.factor=3
# Specify the consumer offset synchronization direction (unidirectionally or bidirectionally).
A->B.sync.group.offsets.enabled=true
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# In the test environment, the value can be 1. In the production environment, it is recommended that the value be greater than 1, for example, 3.
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# In the test environment, the value can be 1. In the production environment, it is recommended that the value be greater than 1, for example, 3.
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5 -
In the binary software package directory, start MirrorMaker to synchronize data.
./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
-
(Optional) If a topic is created in the source cluster after MirrorMaker has been started, and the topic data needs to be synchronized, restart MirrorMaker.
noteTo periodically synchronize new topics without restarting MirrorMaker.
refresh.topics.interval.seconds
is mandatory. Other parameters are optional.
MirrorMaker Configuration Properties​
Parameter | Default Value | Description |
---|---|---|
sync.topic.configs.enabled | true | Whether to monitor the source cluster for configuration changes. |
sync.topic.acls.enabled | true | Whether to monitor the source cluster for ACL changes. |
emit.heartbeats.enabled | true | Whether to let the connector send heartbeats periodically. |
emit.heartbeats.interval.seconds | 5 seconds | Heartbeat frequency. |
emit.checkpoints.enabled | true | Whether to let the connector periodically send the consumer offset information. |
emit.checkpoints.interval.seconds | 5 seconds | Checkpoint frequency. |
refresh.topics.enabled | true | Whether to let the connector periodically check for new topics. |
refresh.topics.interval.seconds | 5 seconds | Frequency of checking for new topics in the source cluster. |
refresh.groups.enabled | true | Whether to let the connector periodically check for new consumer groups. |
refresh.groups.interval.seconds | 5 seconds | Frequency of checking for new consumer groups in the source cluster. |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | Use LegacyReplicationPolicy to imitate MirrorMaker of an earlier version. |
heartbeats.topic.retention.ms | One day | Used when heartbeat topics are created for the first time. |
checkpoints.topic.retention.ms | One day | Used when checkpoint topics are created for the first time. |
offset.syncs.topic.retention.ms | max long | Used when offset sync topics are created for the first time. |
Verifying Data Synchronization​
-
View the topic list in the target cluster to check whether there are source topics.
noteTopic names in the target cluster have a prefix (for example,
A.
) added to the source topic name. This is a MirrorMaker 2 configuration for preventing cyclic topic backup. -
Produce and consume messages in the source cluster, view the consumption progress in the target cluster, and check whether data has been synchronized from the source cluster to the target cluster.
If the target cluster is a Open Telekom Cloud Kafka instance, view the consumption progress on the Consumer Groups page.