Kafkarian move or how to move partitions

Introduction

Hello, Habr! I work as a java programmer in one financial organization. I decided to leave my mark on Habré and write my first article. Due to problems with the presence of devops, I was tasked with updating the kafka cluster from 2.0 to 2.6 without downtime and loss of messages (you know, no one likes it when money hangs in the air or is lost somewhere). I would like to share this experience with you and get constructive feedback. So, enough water, let's get down to business.





Migration scheme

The task was complicated by the fact that it was necessary to migrate from old virtual machines to new ones, so the option to turn off the broker, change the binaries and start it did not suit me.





Below is a diagram of the migration. We have 3 brokers on old VMs, 3 brokers on new VMs, and each broker has its own zookeeper.





Migration plan

In order for us to migrate without anyone noticing it, we will have to "sweat a little". Below is a general outline.





  1. Add addresses of new brokers to the application settings





  2. Punch access between everyone and everything





  3. Prepare infrastructure on new virtual machines





  4. Raise a new cluster of zookieepers and merge it with the old one





  5. Raise new kafka brokers





  6. Migrate all partitions from old brokers to new ones





  7. Disable old kafka brokers and old zookieepers





  8. Remove old brokers and zukipers from new configs









. , , . "bootstrap.servers"





old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092







- . , . , .





  1. , 9092 -> ( 3 )





  2. <----> (+18 )





  3. 9092 (+6 )





  4. 2 3 : 2181, 2888, 3888 ( (18+6)*3 = 72)





: 99 . ! .





100 ,









, .





kafka





- kafka. , /etc/security/limits.conf





kafka hard nofile 262144
kafka soft nofile 262144
      
      



, , , .





, 262144, , ( ). 10 , .









java





/home/kafka/.bash_profile





export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
      
      



jre /opt/java









, , . .





setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
      
      







kafka





ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin
      
      



, , , , myid, id .





/opt/zookeeper/current/conf/zoo.cfg





zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
      
      







/opt/zookeeper/current/bin/zkServer.sh start





.





zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
      
      







, . .





/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
      
      



, .





.../zkServer.sh status
      
      



.





, server.properties broker.id zookeeper.connect





4





broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
      
      







, , .





server.properties (2.0.0). .





inter.broker.protocol.version=2.0.0
      
      







nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
      
      



. cli .





/opt/zookeeper/current/bin/zkCli.sh
      
      







ls /cluster_name/brokers/ids
      
      







[1,2,3,4,5,6]
      
      



6 , .





. - , , , . replication.factor . . .





, "", , .





json , .





{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }
      
      



,





json kafka-reassign-partitions.sh --generate id --broker-list "4,5,6".









generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }
      
      







Proposed partition reassignment configuration ( ). - . , json.





. "" , . kafka-reassign-helper.jar .









prepare-for-reassignment.sh
#       .    20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"
      
      







kafka-reassign-partitions.sh, --execute





move-partitions.sh
#     execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
      
      







. , . , , kafka-reassign-partitions.sh, --verify, .





, , .





reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done
      
      







In progress 0. Is complete - . Failed 0.





move-partitions.sh reassign-verify.sh , .





log.dirs , - .





. kafka-server-stop.sh zkStop.sh .





. zoo.cfg





zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
      
      







, , , . : 2 1 .





server.properties





remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
      
      







, , insync, min.insync.replicas ( 2), default.replication.factor ( 3)





- 2 , 3, , , , insync .









check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"
      
      







If we get not insync: 0 at the output , we can restart brokers one by one.





That's all. The migration of brokers is now complete, except for the subsequent reconfiguration of monitoring and other auxiliary matters.





This is how the instruction looks like, which I sent to the admins who did it all in battle. Surprisingly they did it the first time and no questions asked.





README.txt
    2.0.0 -> 2.6.0

1 .  

1.1       

NEW4.tar.gz ->  4 
migration.tar.gz ->  4 

NEW5.tar.gz ->  5 
NEW6.tar.gz ->  6 

1.2  

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3   root       (    ,   )

/home/kafka/scripts/setup.sh

1.4    kafka (   )

1.5     (   )

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH    /opt/java/bin

  /opt   kafka kafka



2 .    

2.1    kafka (   )

2.2       (  )

/opt/scripts/zkStart.sh

2.3       

OLD1.tar.gz ->  1 
OLD2.tar.gz ->  2 
OLD3.tar.gz ->  3 

2.4  

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5   root       (    )

/home/kafka/scripts/setup-old.sh

2.6    kafka    

2.7       (      )

/home/kafka/scripts/zk-add-new-servers.sh

2.8       (  )

/home/kafka/scripts/zkStatus.sh

      

2.9       
    

/home/kafka/scripts/zkRestart.sh

2.10       
     

/home/kafka/scripts/zkStatus.sh ( )
/opt/scripts/zkStatus.sh ( )

        follower
  leader

2.11   
    
/opt/kafka-start.sh

2.12      
   

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

   [1,2,3,4,5,6]

3      

3.1   (    )
/home/kafka/migration/prepare-for-assignment.sh  20

3.2           /home/kafka/migration/execute


:
  execute1.json
/home/kafka/migration/move-partitions.sh 1

      

:
/home/kafka/migration/reassign-verify.sh 1

     "in progress"  0        .3.2   

3.3          ,     /opt/kafka/kafka-data    
     

3.4       ,  
/opt/kafka/current/bin/kafka-server-stop.sh

3.5       ,  

/home/kafka/scripts/zkStop.sh

3.6      (      )
/opt/scripts/zk-remove-old-servers.sh

3.7     

/opt/scripts/zkRestart.sh

3.8       ( , 2 )

/opt/scripts/zkStatus.sh

3.9      (       )
/opt/scripts/remove-old-protocol.sh


3.10       insync

    /home/kafka/migration/check-insync.sh

not insync    0

3.11       

/opt/kafka/current/bin/kafka-server-stop.sh

    (ps aux | grep kafka    )

 /opt/kafka-start.sh

  3.10  

 !
      
      







Hope it helped someone on this issue. I am waiting for your comments and remarks.





All scripts and instructions, including those for rollbacks, can be found here








All Articles