실시간 구축 : Apache Kafka를 사용한 빅 데이터 메시징, 1 부

빅 데이터 이동이 시작되었을 때 주로 일괄 처리에 중점을 두었습니다. MapReduce, Hive 및 Pig와 같은 분산 데이터 저장 및 쿼리 도구는 모두 연속이 아닌 일괄 처리로 데이터를 처리하도록 설계되었습니다. 기업은 매일 밤 여러 작업을 실행하여 데이터베이스에서 데이터를 추출한 다음 데이터를 분석, 변환 및 저장합니다. 최근에 기업들은 몇 시간에 한 번만 이 아니라 데이터와 이벤트 가 발생하는 즉시 분석하고 처리하는 능력을 발견했습니다 . 그러나 대부분의 기존 메시징 시스템은 실시간으로 빅 데이터를 처리하도록 확장되지 않습니다. 따라서 LinkedIn의 엔지니어는 상용 하드웨어에서 확장하여 빅 데이터의 요구를 충족하는 분산 메시징 프레임 워크 인 Apache Kafka를 구축하고 오픈 소스로 제공했습니다.

지난 몇 년 동안 Apache Kafka는 다양한 사용 사례를 해결하기 위해 등장했습니다. 가장 단순한 경우 애플리케이션 로그를 저장하기위한 간단한 버퍼가 될 수 있습니다. Spark Streaming과 같은 기술과 결합하면 데이터 변경 사항을 추적하고 최종 대상에 저장하기 전에 해당 데이터에 대한 조치를 취하는 데 사용할 수 있습니다. Kafka의 예측 모드는 신용 카드 거래가 발생했을 때 신용 카드 거래의 유효성을 확인하고 나중에 일괄 처리 시간을 기다리지 않는 등 사기를 감지하는 강력한 도구입니다.

이 두 부분으로 구성된 자습서에서는 개발 환경에서 Kafka를 설치하고 실행하는 방법부터 시작하여 Kafka를 소개합니다. Kafka의 아키텍처에 대한 개요를 확인한 후 즉시 사용 가능한 Apache Kafka 메시징 시스템 개발에 대해 소개합니다. 마지막으로 Kafka 서버를 통해 메시지를 보내고 소비하는 사용자 지정 생산자 / 소비자 애플리케이션을 빌드합니다. 튜토리얼의 후반부에서는 메시지를 분할하고 그룹화하는 방법과 Kafka 소비자가 소비 할 메시지를 제어하는 ​​방법을 배웁니다.

Apache Kafka 란 무엇입니까?

Apache Kafka는 빅 데이터에 맞게 확장 할 수 있도록 구축 된 메시징 시스템입니다. Apache ActiveMQ 또는 RabbitMq와 마찬가지로 Kafka를 사용하면 서로 다른 플랫폼에 구축 된 애플리케이션이 비동기 메시지 전달을 통해 통신 할 수 있습니다. 그러나 Kafka는 다음과 같은 주요 측면에서보다 전통적인 메시징 시스템과 다릅니다.

  • 더 많은 상용 서버를 추가하여 수평으로 확장하도록 설계되었습니다.
  • 생산자와 소비자 프로세스 모두에 훨씬 더 높은 처리량을 제공합니다.
  • 일괄 및 실시간 사용 사례를 모두 지원하는 데 사용할 수 있습니다.
  • Java의 메시지 지향 미들웨어 API 인 JMS는 지원하지 않습니다.

Apache Kafka의 아키텍처

Kafka의 아키텍처를 살펴보기 전에 기본 용어를 알아야합니다.

  • 생산자는 주제에 메시지를 게시 할 수 있습니다 과정이다.
  • 소비자는 하나 개 이상의 주제에 가입 주제에 게시 된 메시지를 소비 할 수있는 과정이다.
  • 주제 범주는 메시지가 게시되는 피드의 이름입니다.
  • 브로커는 하나의 시스템에서 실행중인 프로세스이다.
  • 클러스터는 협력 브로커의 그룹입니다.

Apache Kafka의 아키텍처는 매우 간단하여 일부 시스템에서 더 나은 성능과 처리량을 가져올 수 있습니다. Kafka의 모든 주제는 단순한 로그 파일과 같습니다. 생산자가 메시지를 게시하면 Kafka 서버는 해당 주제에 대한 로그 파일 끝에 메시지를 추가합니다. 서버는 또한 각 메시지를 영구적으로 식별하는 데 사용되는 번호 인 offset 을 할당 합니다. 메시지 수가 증가하면 각 오프셋 값이 증가합니다. 예를 들어 생산자가 세 개의 메시지를 게시하는 경우 첫 번째 메시지는 오프셋 1, 두 번째 메시지는 오프셋 2, 세 번째 메시지는 오프셋 3이 될 수 있습니다.

Kafka 소비자가 처음 시작되면 서버에 pull 요청을 보내 오프셋 값이 0보다 큰 특정 주제에 대한 메시지를 검색하도록 요청합니다. 서버는 해당 주제에 대한 로그 파일을 확인하고 3 개의 새 메시지를 반환합니다. . 소비자는 메시지를 처리 ​​한 다음 오프셋 이 3 보다 큰 메시지에 대한 요청을 보냅니다 .

Kafka에서 클라이언트는 오프셋 수를 기억하고 메시지를 검색합니다 .Kafka 서버는 메시지 소비를 추적하거나 관리하지 않습니다. 기본적으로 Kafka 서버는 7 일 동안 메시지를 보관합니다. 서버의 백그라운드 스레드는 7 일 이상 된 메시지를 확인하고 삭제합니다. 소비자는 서버에있는 한 메시지에 액세스 할 수 있습니다. 메시지를 여러 번 읽을 수 있으며 수신 순서의 역순으로 메시지를 읽을 수도 있습니다. 그러나 소비자가 7 일이 지나기 전에 메시지를 검색하지 못하면 해당 메시지를 놓치게됩니다.

Kafka 벤치 마크

LinkedIn 및 기타 기업의 프로덕션 사용에 따르면 Apache Kafka는 적절한 구성을 통해 매일 수백 기가 바이트의 데이터를 처리 할 수 ​​있습니다. 2011 년에 3 명의 LinkedIn 엔지니어가 벤치 마크 테스트를 사용하여 Kafka가 ActiveMQ 및 RabbitMQ보다 훨씬 높은 처리량을 달성 할 수 있음을 입증했습니다.

Apache Kafka 빠른 설정 및 데모

이 자습서에서는 사용자 지정 애플리케이션을 빌드 할 것이지만 기본 제공되는 생산자 및 소비자를 사용하여 Kafka 인스턴스를 설치하고 테스트하는 것으로 시작하겠습니다.

  1. Kafka 다운로드 페이지를 방문하여 최신 버전 (이 문서 작성 시점에서 0.9)을 설치하십시오.
  2. 바이너리를 software/kafka폴더 로 추출하십시오 . 현재 버전의 경우 software/kafka_2.11-0.9.0.0.
  3. 새 폴더를 가리 키도록 현재 디렉터리를 변경합니다.
  4. 다음 명령을 실행하여 Zookeeper 서버를 시작합니다 bin/zookeeper-server-start.sh config/zookeeper.properties..
  5. 다음을 실행하여 Kafka 서버를 시작합니다 bin/kafka-server-start.sh config/server.properties..
  6. 테스트에 사용할 수있는 테스트 주제를 만듭니다 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld..
  7. 같은 주어진 주제에 게시 된 메시지를 소비 할 수있는 간단한 콘솔 소비자를 시작합니다 javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. 테스트 주제에 메시지를 게시 할 수있는 간단한 생산자 콘솔을 시작합니다 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. 생산자 콘솔에 하나 또는 두 개의 메시지를 입력 해보십시오. 소비자 콘솔에 메시지가 표시되어야합니다.

Apache Kafka를 사용한 예제 애플리케이션

Apache Kafka가 기본적으로 작동하는 방식을 확인했습니다. 다음으로 맞춤형 생산자 / 소비자 애플리케이션을 개발해 보겠습니다. 생산자는 콘솔에서 사용자 입력을 검색하고 각각의 새 줄을 Kafka 서버에 메시지로 보냅니다. 소비자는 주어진 주제에 대한 메시지를 검색하여 콘솔에 인쇄합니다. 이 경우 생산자와 소비자 구성 요소는 당신의 자신의 구현입니다 kafka-console-producer.shkafka-console-consumer.sh.

Producer.java클래스 를 생성하여 시작하겠습니다 . 이 클라이언트 클래스에는 콘솔에서 사용자 입력을 읽고 해당 입력을 Kafka 서버에 메시지로 보내는 논리가 포함되어 있습니다.

We configure the producer by creating an object from the java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

예제 응용 프로그램의 경우 생산자가 ByteArraySerializer키와 StringSerializer값으로 사용하고 있음 을 알고 있습니다. 따라서 클라이언트 측에서는 org.apache.kafka.common.serialization.ByteArrayDeserializer키와 org.apache.kafka.common.serialization.StringDeserializer값 을 사용해야 합니다. 의 값으로 그 클래스를 설정 KEY_DESERIALIZER_CLASS_CONFIG하고 VALUE_DESERIALIZER_CLASS_CONFIG역 직렬화하는 소비자를 가능하게 할 것이다 byte[]생산자에 의해 전송 인코딩 유형을.

마지막으로 GROUP_ID_CONFIG. 문자열 형식의 그룹 이름이어야합니다. 잠시 후에이 구성에 대해 자세히 설명하겠습니다. 지금은 네 가지 필수 속성이 설정된 Kafka 소비자를 살펴 보겠습니다.