Redis Streams에서 소비자 그룹을 사용하는 방법

Roshan Kumar는 Redis Labs의 선임 제품 관리자입니다.

Redis Streams는 Redis 5.0에 도입 된 새로운 데이터 구조로, 데이터 스트림을 생성하고 관리 할 수 ​​있습니다. 이전 기사에서 스트림에 데이터를 추가하는 방법과 여러 방법으로 데이터를 읽는 방법을 설명했습니다. 이 기사에서는 Redis Streams에서 소비자 그룹을 사용하는 방법을 설명합니다. 소비자 그룹은 처리 속도를 높이거나 느린 소비자의 부하를 줄이기 위해 여러 클라이언트간에 메시지 스트림을 분할하는 방법입니다.

완벽한 세상에서 데이터 생산자와 소비자는 같은 속도로 작업하며 데이터 손실이나 데이터 백 로그가 없습니다. 불행히도 현실 세계에서는 그렇지 않습니다. 거의 모든 실시간 데이터 스트림 처리 사용 사례에서 생산자와 소비자는 서로 다른 속도로 작업합니다. 또한 각각 고유 한 요구 사항과 처리 속도를 가진 두 가지 이상의 소비자 유형이 있습니다. Redis Streams는 소비자 지원에 크게 끌리는 기능 세트로 이러한 요구를 해결합니다. 가장 중요한 기능 중 하나는 소비자 그룹입니다.

Redis Streams 소비자 그룹을 사용하는 경우

소비자 그룹의 목적은 데이터 소비 프로세스를 확장하는 것입니다. 이미지 처리 응용 프로그램을 예로 들어 보겠습니다. 이 솔루션에는 세 가지 주요 구성 요소가 필요합니다.

  1. 이미지를 캡처하고 저장하는 제작자 (아마도 하나 이상의 카메라).
  2. 이미지가 도착한 순서대로 (스트림 데이터 저장소에) 이미지를 저장하는 Redis Stream 과
  3. 각 이미지를 처리하는 이미지 프로세서입니다. 
Redis Labs

생산자가 초당 500 개의 이미지를 저장하고 이미지 프로세서가 최대 용량으로 초당 100 개의 이미지 만 처리한다고 가정합니다. 이 속도 차이는 백 로그를 생성하고 이미지 프로세서가 따라 잡을 수 없습니다. 이 문제를 해결하는 쉬운 방법은 각각 상호 배타적 인 이미지 집합을 처리하는 5 개의 이미지 프로세서 (그림 2 참조)를 실행하는 것입니다. 소비자 그룹을 통해이를 달성 할 수 있으며,이를 통해 워크로드를 분할하고이를 다른 소비자에게 라우팅 할 수 있습니다.

Redis Labs

소비자 그룹은 데이터 파티셔닝 그 이상을 수행합니다. 데이터 안전을 보장하고 재해 복구를 가능하게합니다.

Redis Streams 소비자 그룹이 작동하는 방식

소비자 그룹은 Redis Stream 내의 데이터 구조입니다. 그림 3에서 볼 수 있듯이 소비자 그룹을 목록 모음으로 생각할 수 있습니다. 상상할 수있는 또 다른 것은 어떤 소비자도 소비하지 않는 품목 목록입니다. 논의를 위해 이것을 "소비되지 않은 목록"이라고합시다. 데이터가 스트림에 도착하면 즉시 사용되지 않은 목록으로 푸시됩니다.

Redis Labs

소비자 그룹은 일반적으로 애플리케이션이 연결된 각 소비자에 대해 별도의 목록을 유지합니다. 그림 3에서 우리의 솔루션에는 각각 소비자 1, 소비자 2,… 소비자 n을 통해 데이터를 읽는 N 개의 동일한 애플리케이션 (앱 1, 앱 2,…. 앱 n)이 있습니다.

앱이 XREADGROUP 명령을 사용하여 데이터를 읽을 때 특정 데이터 항목이 사용되지 않은 목록에서 제거되고 각 소비자에 속하는 보류중인 항목 목록으로 푸시됩니다. 따라서 두 소비자는 동일한 데이터를 사용하지 않습니다.

마지막으로 앱이 XACK 명령으로 스트림에 알림을 보내면 소비자의 보류중인 항목 목록에서 항목이 제거됩니다.

소비자 그룹의 기본 사항을 설명 했으므로 이제이 데이터 수명주기가 작동하는 방식을 자세히 살펴 보겠습니다.

Redis Streams 소비자 그룹 생성

아래와 같이 XGROUP CREATE 명령을 사용하여 새 소비자 그룹을 만들 수 있습니다.

XGROUP mystream mygroup $ MKSTREAM 만들기

XREAD와 마찬가지로 명령 끝에있는 $ 기호는 해당 시점부터 새 데이터 만 전달하도록 스트림에 지시합니다. 대체 옵션은 0 또는 스트림 항목의 다른 ID입니다. 0을 사용하는 경우 스트림은 스트림 시작부터 모든 데이터를 전달합니다.

MKSTREAM은 아직 존재하지 않는 경우 새 스트림 인 mystream을 생성합니다.

Redis Stream 데이터 읽기 및 관리

Redis Stream (mystream)이 있고 위에 표시된대로 이미 소비자 그룹 (mygroup)을 생성했다고 가정합니다. 이제 다음 예제와 같이 이름이 a, b, c, d, e 인 항목을 추가 할 수 있습니다.

XADD mystream * 이름 a

a부터 e까지의 이름에 대해이 명령을 실행하면 Redis Stream, mystream 및 소비자 그룹 mystream의 사용되지 않은 목록이 채워집니다. 이것은 그림 4에 설명되어 있습니다.

Redis Labs

여기에서 소비자 Alice와 Bob이 아직 작업을 시작하지 않았 음을 알 수 있습니다. 앱 A는 소비자 Alice를 통해 데이터를 소비하고 앱 B는 Bob을 통해 데이터를 소비합니다.

Redis Streams 데이터 사용

그룹에서 데이터를 읽는 명령은 XREADGROUP입니다. 이 예에서 앱 A가 데이터 처리를 시작하면 다음과 같이 소비자 (Alice)를 호출하여 데이터를 가져옵니다.

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>

마찬가지로 앱 B는 다음과 같이 Bob을 통해 데이터를 읽습니다.

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream>

끝에있는 특수 문자>는 Redis Streams에게 다른 소비자에게 전달되지 않은 데이터 항목 만 가져 오도록 지시합니다. 또한 두 소비자가 동일한 데이터를 사용하지 않으므로 그림 5와 같이 데이터를 사용하지 않은 목록에서 Alice와 Bob으로 이동하게됩니다.

Redis Labs

보류중인 항목 목록에서 처리 된 메시지 제거

소비자의 보류중인 항목 목록에있는 데이터는 앱 A와 앱 B가 데이터를 성공적으로 사용했음을 Redis Streams에 확인할 때까지 그대로 유지됩니다. 이것은 XACK 명령을 사용하여 수행됩니다. 예를 들어 앱 A는 ID가 1526569411111-0 및 1526569411112-0 인 d와 e를 소비 한 후 다음과 같이 승인합니다.

XACK mystream mygroup 1526569411111-0 1526569411112-0

XREADGROUP과 XACK의 조합은 데이터 안전을 보장하는 트랜잭션을 시작하고 커밋하는 것과 유사합니다. 

XACK를 실행 한 후 App A가 아래와 같이 XREADGROUP을 실행했다고 가정합니다. 이제 데이터 구조는 그림 6과 같습니다.

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>
Redis Labs

실패에서 복구

b와 c를 처리하는 동안 실패로 인해 App B가 종료 된 경우 데이터 구조는 그림 7과 같습니다.

Redis Labs

이제 두 가지 옵션이 남았습니다.

1. 앱 B를 다시 시작하고 소비자 (Bob)에서 데이터를 다시로드합니다.

이 경우 앱 B는 XREADGROUP 명령을 사용하여 소비자 (Bob)에서 데이터를 읽어야하지만 한 가지 차이점이 있습니다. 끝에> 대신 App B는 0 (또는 처리 된 이전 데이터 항목보다 낮은 ID)을 전달합니다. >는 소비되지 않은 목록의 새 데이터를 소비자에게 보냅니다.

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream 0

위의 명령은 소비자 Bob의 목록에 이미 저장된 데이터 항목을 검색합니다. 사용되지 않은 목록에서 새 데이터를 가져 오지 않습니다. 앱 B는 새 데이터를 가져 오기 전에 소비자 Bob의 모든 데이터를 반복 할 수 있습니다.

2. Alice가 Bob의 모든 데이터를 요청하고 앱 A를 통해 처리하도록합니다.

이는 노드, 디스크 또는 네트워크 장애로 인해 앱 B를 복구 할 수없는 경우 특히 유용합니다. 이러한 경우 다른 소비자 (예 : Alice)는 Bob의 데이터를 요청하고 해당 데이터를 계속 처리하여 서비스 중단 시간을 방지 할 수 있습니다. Bob의 데이터를 요청하려면 두 가지 명령 세트를 실행해야합니다.

XPENDING mystream mygroup-+ 10 Bob

이렇게하면 Bob에 대해 보류중인 모든 데이터 항목을 가져옵니다. 옵션-및 + 전체 범위를 가져옵니다. b와 c의 ID가 각각 1526569411113-0 및 1526569411114-0 인 경우 Bob의 데이터를 Alice로 이동하는 명령은 다음과 같습니다.

XCLAIM mystream mygroup Alice 0 1526569411113-0 1526569411114-0

소비자 그룹은 소비 목록의 데이터에 대해 실행중인 시계를 유지합니다. 예를 들어 앱 B가 b를 읽으면 Bob이 ACK를받을 때까지 시계가 시작됩니다. XCLAIM 명령의 시간 옵션을 사용하면 지정된 시간보다 오래 유휴 상태 인 데이터 만 이동하도록 소비자 그룹에 지시 할 수 있습니다. 위의 예와 같이 0을 전달하여 무시할 수도 있습니다. 이러한 명령의 결과는 그림 8에 나와 있습니다. XCLAIM은 소비자 프로세서 중 하나가 느려서 처리되지 않은 데이터의 백 로그가 발생하는 경우에도 유용합니다.

Redis Labs

이전 기사에서 Redis Streams를 사용하는 방법의 기본 사항을 다뤘습니다. 이 기사에서 좀 더 자세히 살펴보고 소비자 그룹을 사용하는시기와 작동 방식을 설명했습니다. Redis Streams의 소비자 그룹은 데이터 파티션, 수명주기 및 데이터 안전을 관리 할 때 부담을 줄여줍니다. 또한 소비자 그룹의 확장 기능은 많은 실시간 애플리케이션에 도움이 될 수 있습니다.

Redis Streams에 대한 세 번째 기사에서 Redis Streams와 Redis 용 Java 기반 오픈 소스 라이브러리 인 Lettuce를 사용하여 실시간 분류 애플리케이션을 개발하는 방법을 보여 드리겠습니다. 한편 Redis 프로젝트 웹 사이트에서 Redis Streams 자습서를 통해 더 많은 것을 배울 수 있습니다. 

Roshan Kumar는 Redis Labs 의 선임 제품 관리자  입니다. 그는 소프트웨어 개발 및 기술 마케팅 분야에서 광범위한 경험을 보유하고 있습니다. Roshan은 Hewlett-Packard와 ZillionTV, Salorix, Alopa 및 ActiveVideo를 포함한 많은 성공적인 실리콘 밸리 스타트 업에서 근무했습니다. 열정적 인 프로그래머 인 그는 어린 학생들을위한 컴퓨터 프로그래밍 과정을 호스팅하는 온라인 플랫폼 인 mindzeal.com을 설계하고 개발했습니다. Roshan은 Santa Clara University에서 컴퓨터 과학 학사 학위와 MBA를 받았습니다.

New Tech Forum은 새로운 엔터프라이즈 기술을 전례없이 깊이 있고 폭넓게 탐구하고 논의 할 수있는 장을 제공합니다. 선택은 우리가 중요하고 독자들에게 가장 큰 관심을 가지고 있다고 생각하는 기술을 선택하여 주관적입니다. 는 게시를위한 마케팅 자료를 허용하지 않으며 제공되는 모든 콘텐츠를 편집 할 권리를 보유합니다. 모든 문의 사항은 [email protected]으로 보내주십시오  .