본문 바로가기
프로젝트/kafka를 활용한 주식 알람

[Kafka] AWS EC2에 카프카 클러스터 설치하기 | LIM

by forestlim 2023. 1. 24.
728x90
반응형

본격 프로젝트를 시작하기에 앞서 카프카를 설치하는데 도커로 설치하는 것도 좋겠지만 서버에 직접 설치하는 것도 재밌을 것 같아서 AWS EC2를 이용하여 설치해보았다. 회사에서 요즘 FTP 서버를 구축하면서 리눅스 쓰면서 이리저리 많이 부딪혔는데 그때의 경험들이 도움이 많이 되었다. 또한 카프카로 유명하신 데브원영님이 AWS에 카프카 클러스터 설치하는 방법을 자세하게 올려주셔서 참고해서 구축해보았다. 

https://blog.voidmainvoid.net/325

 

AWS에 카프카 클러스터 설치하기(ec2, 3 brokers)

보통 테스트할때 맥북 또는 윈도우 컴퓨터의 1대 장비에 설치하곤하는데요. 고 가용성 테스트를 하기 위해서는 반드시 3대 이상의 클러스터를 설치해야 완벽한 카프카클러스터로서 테스트가 가

blog.voidmainvoid.net

 

1. AWS EC2 생성

AWS EC2는 GCP Compute Engine 에 VM(Virtual Machine) 과 동일하다고 생각하면 된다. 나는 프리티어용으로 t2.micro 로 구축했다. 

Base Image: Ubuntu20.04
Instance Typte: t2.micro

https://velog.io/@newon-seoul/AWS-EC2%EC%97%90-%EB%A6%AC%EB%88%85%EC%8A%A4-%EA%B8%B0%EB%B0%98%EC%9A%B0%EB%B6%84%ED%88%AC-20.04LTS-%EC%9D%B8%EC%8A%A4%ED%84%B4%EC%8A%A4-%EC%83%9D%EC%84%B1%ED%95%98%EA%B8%B0

 

AWS EC2에 (리눅스 기반)우분투 20.04LTS 인스턴스 생성하기

EC2 우분투 인스턴스 사용하기

velog.io

 

주키퍼 앙상블을 3대로 구성했기 때문에 1대 정도의 주키퍼 서버 장애에도 안정적으로 서비스를 제공할 수 있다. 과반수 이상 서버에 장애가 발생하게 되면 (3대일 경우 2대 이상) 서비스 장애 상태에 빠지게 된다. 

 

 

2. 방화벽 설정

zookeeper 와 kafka cluster 가 서로 통신하기 위해서는 inbound 규칙을 생성해야 한다. 방화벽 설정은 Security Group 에서 수정할 수 있다. 생성한 kafka 인스턴스들이 속해있는 Security Group Name 을 찾아서 inbound 규칙을 추가해주면 된다. 

 

  • 주키퍼의 기본 TCP 포트는 2181
  • 카프카의 기본 TCP 포트는 9092
  • 주키퍼 앙상블 구성을 위한 포트(2888, 3888)

각각의 관련된 포트들을 미리 설정해서 열어두도록 하자.

 

그 다음 SSH 로 Instance 에 접속해서 각 host 별로 /etc/hosts 에 Public Ip 및 host 명을 지정한다. 자기 자신은 0.0.0.0 으로 하고 나머지 서버들은 AWS의 public ip 로 지정한다. 

 

 

3. Java 설치

kafka 를 사용하기 위해서는 Java 가 설치 되어 있어야 한다. 아래 블로그 참고해서 바로 설치할 수 있었다.

https://davelogs.tistory.com/71

 

우분투(Ubuntu)에서 터미널로 JAVA 설치하기

Ubuntu 20.04 환경에 Java를 설치하고자 한다. 직접 설치파일을 다운로드받아 설치할 수도 있지만, 여기서는 터미널 환경에서 설치하는 방법으로 진행했다. 1. 설치 $ sudo apt-get update $ sudo apt-get upgrade

davelogs.tistory.com

 


4. Zookeeper 설치

3개의 인스턴스(노드) 에 동일하게 작업해주면 된다. 

$ wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz

 

압축 풀기

$ tar xvf zookeeper-3.4.12.tar.gz

 

zookeeper 의 configuration 설정. 설치한 zookeeper 폴더 내부 conf 폴더로 이동하여 zoo.cfg 파일을 생성 후 다음과 같은 내용을 적어준다.

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
  • tickTime: 주기퍼가 사용하는 시간에 대한 기본 측정 단위(밀리초)
  • initLimit: 팔로워가 리더와 초기에 연결하는 시간에 대한 타임 아웃 tick 의 수
  • syncLimit: 팔로워가 리더와 동기화 하는 시간에 대한 타임 아웃 tick의 수(주키퍼에 저장된 데이터가 크면 수를 늘려야 한다.)
  • dataDir: 주키퍼의 트랜잭션 로그와 스냅샷이 저장되는 데이터 저장 경로.
  • clientPort: 주키퍼 사용 TCP 포트
  • server.x: 주키퍼 앙상블 구성을 위한 서버 설정이며, server.myid 형식으로 사용

여기서 2888, 3888은 기본 포트이며 앙상블 내 노드끼리 연결하는데 사용하고, 리더 선출에 사용

 

 

다음으로 zookeeper 앙상블을 만들기 위해 각 zookeeper 마다 myid 라는 파일을 만들어줘야 한다. myid 위치는 나같은 경우 /var/lib/zookeepr/myid 이고, 해당 파일에 숫자를 넣으면 된다. kafka1의 경우 1, kafka2의 경우 2, kafka3의 경우 3을 넣어주었다.

 

$ cat /var/lib/zookeeper/myid

3

 

🏃‍♀️ 주키퍼를 실행해보자!

$ /var/lib/zookeeper/bin/zkServer.sh start

ZooKeeper JMX enabled by default
Using config: /var/lib/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 18794.

 

5. Kafka 설치

kafka 설치 버전은 2.1.0

$ wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz

 

 

zookeeper 와 마찬가지로 압축을 풀어준다.

$ tar xvf kafka_2.11-2.1.0.tgz

 

kafka 실행을 위해서 broekr.id 설정, zookeeper 에 대한 설정과 listener 설정을 다음과 같이 한다. 대상 파일은 위에 설치한 kafka 폴더 내부 kafka/config/server.properties 이다. 나는 로컬에서 kafka 서버로 연결할거기 때문에 advertised.listeners 에 각각 서버의 public ip dns를 넣어주었다. 

 

블로그 글에 이러한 말이 있었는데 사실 아직까지 이 부분은 잘 이해하지 못했다.

참고로 zookeeper 설정시 마지막에 /test 와 같이 route를 넣는 것을 추천드립니다. 이렇게 넣을 경우 zookeeper의 root node가 아닌 child node에 카프카정보를 저장하게 되므로 유지보수에 이득이 있습니다.

 

🏃‍♀️kafka 실행!

$ /var/lib/kafka/bin/kafka-server-start.sh ./config/server.properties

 

😞 참고) 맞닥뜨릴 수 있는 에러

(Memory Allocation)

원인은 JVM 메모리 부족이다. 

kafka/bin/kafka-server-starth.sh 파일에서 힙메모리 사이즈를 조절해주면 된다. 

아니면 .bashrc 에 환경변수로 등록해놔도 된다.

이 부분을 -Xmx256M -Xms128M 로 바꿔주었다. 현재 t2.micro 머신을 사용하고 있기 때문에 1vCPU, 1GiB Memory 이기 때문에 다음과 같이 설정해주었다.

 

JVM 힙 사이즈
카프카는 자바 기반의 JVM 을 사용하는 애플리케이션으로서 자바 기반 애플리케이션을 시작할 때 메모리가 할당되는 영역인 힙(heap) 이 만들어집니다. 카프카는 기본으로 1GB의 힙 메모리를 사용하도록 설정되어 있고, 설정 파일에서 이 값을 변경할 수 있습니다. 설정 변경 방법은 JMX 설정 방법을 참고해 KAFKA_HEAP_OPTS='-Xmx6G -Xms6G' 로 추가하면 됩니다. 컨플루엍느 사의 도큐먼트에 따르면, 카프카는 초당 메시지 단위, 메가비트 단위를 처리함에 있어 5GB의 힙 메모리면 충분하고, 남아 있는 메모리는 페이지 캐시로 사용하기를 권장합니다. 또한 페이지 캐시를 서로 공유해야 하기 때문에 하나의 시스템에 카프카를 다른 애플리케이션과 함께 실행하는 것은 권장하지 않습니다.

 

6. consumer, producer 테스트

이제 본격적으로 나의 로컬(맥북) 환경에서 kafka 클러스터로 consumer, producer 테스트를 해보자.

먼저 토픽을 생성해야 한다. 토픽을 생성하는 명령어는 다음과 같다. 

$ /var/lib/kafka/bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 1 --topic mytopic

 

다음과 같이 kafka 서버 내에서 producer 와 consumer 를 테스트해볼 수 있고

$ /var/lib/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
  --topic mytopic
> This is a message
> This is another message


$ /var/lib/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
  --topic mytopic
  --topic test --from-beginning
This is a message
This is another message

 

 

나는 파이썬 내부에서 Kafka Client 를 구현하여 테스트해보았다. 

 

producer.py

 

 

consumer.py

 

728x90
반응형

댓글