본문 바로가기
AWS/MSK (Kafka)

Integrating S3 and DocumentDB with AWS Managed Streaming for Apache Kafka (MSK)

by 여행을 떠나자! 2022. 8. 1.

2022.08.01

목차

 

1. 개요

2. 아키텍처 구성도

3. 환경 구성  테스트

      A. AWS MSK 구성

      B. AWS DocumentDB 구성

      C. AWS S3 구성

      D. AWS EC2 이용한 MongoDB, Kafka 클라이언트 구성

      E. AWS MSK Connect with Confluent S3 Sink Connector 구성

      F. AWS MSK Connect with MongoDB Connector 구성

      G. Kafka Connector 기능 확인

4. 제약 사항

5. 대안 아키텍처

참조 문서


1. 개요

- MSK (Managed Streaming for Apache Kafka)는 완전 관리형의 고가용성 Apache Kafka 서비스를 통해 안전하게 데이터를 스트리밍 한다.

- MSK Connect는 완전 관리형의 Apache Connect 워크로드를 수행한다.

- MSK(Apache) Connect는 Kafka Broker와 다른 데이터 소스 간에 쉽게 확장 가능하고 회복력 있는 통합 기능을 제공한다.

https://cdn.confluent.io/wp-content/uploads/kafka_connect-1.png

 

- Kafka connect에 사용 가능한 대표적인 Source/Sink Connector들은 아래와 같다.

  https://www.confluent.io/ko-kr/product/confluent-connectors/

   https://lenses.io/connect/

   https://debezium.io/documentation/reference/1.9/connectors/index.html

 

- 본 문서에서 사용한 Connector는 MongoDB Connector와 AWS S3 Sink Connector이다

  ✓ AWS S3 Sink Connector (Confluent사 제공 / Confluent Community License)

      https://www.confluent.io/hub/confluentinc/kafka-connect-s3?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.brand_tp.prs_tgt.confluent-brand_mt.mbm_rgn.apac_lng.kor_dv.all_con.confluent-general&utm_term=confluent&placement=&device=c&creative=&gclid=Cj0KCQjw0JiXBhCFARIsAOSAKqCbeuURV0H7IRMpVlE-0HyFhU8V8cpVrCce_AxbFcyK0vfkzn1LmSQaApgrEALw_wcB

   MongoDB Connector (MongoDB사 제공 / Apache license 2.0)

      https://www.confluent.io/hub/mongodb/kafka-connect-mongodb

 

- AWS S3(Simple Storage Service)는 업계 확장성, 데이터 가용성, 보안 및 성능을 제공하는 객체 스토리지 서비스이다.

- AWS DocumentDB는 MongoDB 워크로드를 운영하는 확장 가능하고, 내구성이 우수한 완전 관리형 서비스이다.

 

2. 아키텍처 구성도

A. 구성도

B. 업무 흐름

   - EC2 내의 애플리케이션에서 Avro형식의 데이터를 MSK의 토픽에 퍼블리쉬한다.

   - MSK Connect (Confluent S3)는 토픽의 데이터를 배치 단위로 묶어 S3에 저장한다.

   - MSK Connect (MongoDB)는 토픽의 데이터를 DocumentDB에 저장한다. 

 

C. 소프트웨어 환경

   - MSK / Apache Kafka 2.7.2, Apache Kafka Connect 2.7.1

   - DocumentDB / MongoDB 4.0

   - Confluent S3 Sink Connecter 10.0.11

   - MongoDB Connector 1.7.0

 

3. 환경 구성 테스트

A. MSK 구성

- MSK Cluster 생성

   o Management console에서 “Amazon MSK > MSK Clusters > Clusters > Create cluster” 선택한다.

   o 주요 파라미터 설정

      ▷ Cluster name: msk-connect-cluster

      ▷ Apache Kafka version: 2.7.2

      ▷ Networking: ‘Default VPC’ 사용하는 경우 임의의 zone/subnet 선택, VPC Public Private subnet으로 구성된 경우 Private subnet 선택

      ▷ Security group: ‘default’ 삭제, ‘msk-connect-sg’ 생성

      ▷ Encryption > Between clinets and brokers: ‘Plaintext’ 선택, MSK Connect하에하에 사용할 Connector에서 TLS 연결을 설정할 없음

      ▷ Log group: CloudWatch 서비스에서 Log group ‘/aws/msk/msk-connect’ 생성 선택

      ▷ ‘Configure auto scaling for storage’ 설정한다.

      ▷ 일정 시간이 경과 되면 MSK Cluster 상태가 ‘Active’ 변경 되면서 생성이 완료된다.

      ▷ VPC 내의 MSK 클라이언트들이 접속할 있도록 Security group 변경한다

          사용 중인 VPC 사설 IP 대역에서 접속할 있도록 아래와 같이 Inbound rule 추가한다. VPC IP 대역은 ‘VPC > Your VPCS’에서 ‘IPv4 CIDR’ 값을확인 할 있다.

- MSK 생성 결과

   o MSK Cluster 생성이 완료되면 Kafka producer/comsumer 접속시 사용될 Bootstrap server 정보를 조회 할 있다.

 

B. DocumentDB 구성

- DocumentDB Cluster 생성

   o DocumentDB 생성하기 위해서는 반듯이 default vpc 있어야 한다.

   o Management console에서 “Amazon DocumentDB > Launch Amazon DocumentDB” 선택한다.

   o 주요 파라미터 설정

      ▷ Cluster Identifier: docdb-msk-cluster

      ▷ Master Username: mongoadmin

      ▷ Master password: ********

   o MSK Connect하에하에 사용할 Connector에서 TLS 연결을 설정할 없기 때문에 신규 생성한 DocumentDB TLS 사용을 비활성화한다

      ▷ 신규 Parameter groups 생성하고, TLS 설정을 비활성화 한다.

   o 기본으로 설정된 Security Group parameter 변경한다.

      ▷ 사전에 Management console에서 ‘VPC > Security > security Groups > Create securty group’ 선택 ‘docdb-msk-sg’ 생성한다.

      ▷ default security group 삭제하고 ‘docdb-msk-sg’ 생성후 추가한다

      ▷ Cluster parameter group ‘docdb-msk-cluster’ 변경한다.

      ▷ ‘Apply Immediately’ 선택 ‘Modify cluster’ 선택한다. 수동으로 클러스터를 리부트 해야 반영된다.

   o VPC 내의 DocumentDB 클라이언트들이 접속할 있도록 Security group 변경한다

      ▷ 사용 중인 VPC 사설 IP 대역에서 접속할 있도록 아래와 같이 Inbound rule 추가한다. VPC IP 대역은 ‘VPC > Your VPCS’에서 ‘IPv4 CIDR’ 값을 확인 할 있다.

- DocumentDB 생성 결과

   o DocumentDB 생성이 완료되면 클라이언트가 접속시 사용할 Cluster endpoint 조회 할 있다.

 

C. S3 구성

- S3 Bucket 생성

   o Management console에서 “Amazon S3 > Create bucket” 선택한다.

   o 주요 파라미터 설정

      ▷ Bucket name: 785122******-msk-connect-sink, bucket명은 unique해야 하기 때문에 prefix Account ID 사용

      ▷ AWS Region: Asia Pacific (Seoul) ap-northeast-2

- S3 Gateway endpoint 생성

   o VPC내에서 Private network 통해 S3 접근하도록 S3 Gateway endpoint 생성한다.

 

D. EC2 이용한 MongoDB, Kafka 클라이언트 구성

- EC2 생성

   o MongoDB Kafka 클라이언트 어플리케이션 운영을 위하여 EC2 생성한다

   o 생성한 EC2 ‘Connect > EC2 Instance Connect’ 선택하여 접속한다.

- Kafka Client 설치

   o Kafka 바이너리와 Java 설치한다.

[ec2-user@ip-172-31-17-43 ~]$ sudo yum install java -y
...
[ec2-user@ip-172-31-17-43 ~]$ wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
...
[ec2-user@ip-172-31-17-43 ~]$ tar xzf kafka_2.12-2.2.1.tgz 
[ec2-user@ip-172-31-17-43 ~]$ cd kafka_2.12-2.2.1/bin
[ec2-user@ip-172-31-17-43 bin]$

- Kafka topic 생성

   o 생성된 MSK Bootstrap server 정보를 이용하여 접속한다.

   o Topic: msk-connect-topic (데이터 저장용), msk-connect-dlq-topic (Dead Letter Queue)

[ec2-user@ip-172-31-17-43 bin]$ ./kafka-topics.sh --bootstrap-server b-2.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-1.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-3.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092 \
>   --create --topic msk-connect-topic \
>   --partitions 3 \
>   --replication-factor 2
[ec2-user@ip-172-31-17-43 bin]$ ./kafka-topics.sh --bootstrap-server b-2.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-1.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-3.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092 \
>   --create --topic msk-connect-dlq-topic \
>   --partitions 3 \
>   --replication-factor 2
[ec2-user@ip-172-31-17-43 bin]$ ./kafka-topics.sh --bootstrap-server b-2.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-1.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-3.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092 \
>   --list
__amazon_msk_canary
__consumer_offsets
msk-connect-dlq-topic
msk-connect-topic
[ec2-user@ip-172-31-17-43 bin]$

- Mongo shell 설치

[ec2-user@ip-172-31-17-43 ~]$ sudo vi /etc/yum.repos.d/mongodb-org-6.0.repo
[mongodb-org-6.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/7/mongodb-org/6.0/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-6.0.asc
[ec2-user@ip-172-31-17-43 ~]$ sudo yum install -y mongodb-mongosh
...
[ec2-user@ip-172-31-17-43 ~]$

- DocumentDB 접속 MongoDB database 생성

   o MongoDB database: msk-connect-db, Database 생성을 위한 별도의 API 없으며 '"use" 명령어를 사용하면 된다.

[ec2-user@ip-172-31-17-43 ~]$ mongosh "mongodb://mongoadmin:pass1004@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false"Current Mongosh Log ID: 62e7e5a3e1ce14397cb5633b
Connecting to:          mongodb://<credentials>@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false&appName=mongosh+1.5.4
Using MongoDB:          4.0.0
Using Mongosh:          1.5.4

For mongosh info see: https://docs.mongodb.com/mongodb-shell/

rs0 [primary] test> use msk-connect-db
switched to db msk-connect-db
rs0 [primary] msk-connect-db> db.book.insertOne({"name": "MongoDB Tutorial", "author": "velopert"});
{
  acknowledged: true,
  insertedId: ObjectId("62e7e69a4b134bbcab1b9f10")
}
rs0 [primary] msk-connect-db>

- DocumentDB 관련 참고 사항

   o DocumentDB Standard Connection String Format(“mongodb://aws.com”) 지원하고 DNS Seed List Connection Format(“mongodb+srv//...”) 지원하지 않는다.

[ec2-user@ip-172-31-17-43 ~]$ mongosh "mongodb+srv://mongoadmin:pass1004@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false" 
Current Mongosh Log ID: 62e7e80277c877d00a7ab321
Connecting to:          mongodb+srv://<credentials>@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false&appName=mongosh+1.5.4
Error: querySrv ENOTFOUND _mongodb._tcp.docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com
[ec2-user@ip-172-31-17-43 ~]$

      ▷ https://www.mongodb.com/docs/manual/reference/connection-string/?_ga=2.207246417.393721112.1659176655-2045538917.1658848516&_gac=1.19799754.1658851481.Cj0KCQjwof6WBhD4ARIsAOi65ajoPREnkCUHfP0VPLNx7Eu1diotF229WyMSE88w_oxkeFEYNZRcLtoaAhlmEALw_wcB#dns-seed-list-connection-format

   o DocumentDB Connection “replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false” 옵션을 받드시 포함해야 한다.

      ▷ replicaSet: Specifies the name of the replica set, if the mongod is a member of a replica set.

rs0 [primary] test> rs.status()
{
  set: 'rs0',
  date: ISODate("2022-08-01T14:57:13.000Z"),
  myState: 2,
  members: [
    {
      _id: 0,
      name: 'docdb-msk-cluster.cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com:27017',
      health: 1,
      state: 1,
      stateStr: 'PRIMARY',
      uptime: 2701
    },
    {
      _id: 1,
      name: 'docdb-msk-cluster2.cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com:27017',
      health: 1,
      state: 2,
      stateStr: 'SECONDARY',
      uptime: 2701
    },
    {
      _id: 2,
      name: 'docdb-msk-cluster3.cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com:27017',
      health: 1,
      state: 2,
      stateStr: 'SECONDARY',
      self: true,
      uptime: 2701
    }
  ],
  ok: 1,
  operationTime: Timestamp({ t: 1659365829, i: 1 })
}
rs0 [primary] test>

      ▷ readPreference: Specifies the read preferences for this connection.

      ▷ retryWrites: Enable retryable writes. DocumentDB는 retryWrites를 지원하지 않음

rs0 [primary] test> db.book.insertOne({"name": "MongoDB Tutorial", "author": "velopert"});
MongoServerError: Retryable writes are not supported
rs0 [primary] test>

 

E. MSK Connect w/ Confluent S3 Sink Connector 구성

- MSK Custom plugin 생성

   o Confluent S3 Sink Connector https://www.confluent.io/hub/confluentinc/kafka-connect-s3에서 PC 다운로드한다.

   o 다운로드한 파일을 S3 업로드한다.

   o ‘Amazon MSK > Custom plugins > Create custom plugin’ 선택후 Connector 등록한다.

- Confluent S3 Sink Connector IAM Role 생성

   o Management console에서 “IAM > Roles > Create role 선택한다

   o Trusted entity “Use cases for other AWS services: S3” 선택한다

   o “Create policy” 선택후 JSON 탭에서 아래 내용을 입력한다.  Resource 항목에 C절에서 생성한 S3 Bucket명을 입력한다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListAllMyBuckets"
      ],
      "Resource": "arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetBucketLocation",
        "s3:DeleteObject"
      ],
      "Resource": "arn:aws:s3:::_785122******-msk-connect-sink_"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:AbortMultipartUpload",
        "s3:ListMultipartUploadParts",
        "s3:ListBucketMultipartUploads"
      ],
      "Resource": "*"
    }
  ]
}

   o Policy명에 “msk-connector-s3-policy” 입력한다.

   o Role명에 “msk-connector-s3-role” 입력한다.

   o ‘msk-connector-s3-role’ rule Trust entities Serice “s3.amazonaws.com”에서 “kafkaconnect.amazonaws.com”으로 변경한다.

- Confluent S3 Sink Connector 생성

   o Connecotor 생성이 완료되면 동시에 Connector worker 실행된다.

   o ‘Amazon MSK > Connector > Create connector’ 선택한다.

   o 이전 단계에서 생성한 plugin 선택한다.

   o Connector name ‘confluent-s3-sink-connector-msk-connector-topic’ 입력한다. 접속할 MSK cluster 선택하고 Authentication ‘None’ 선택한다.

   o Connector configuration 다음과 같이 입력한다. 입력한 configuration 대하여 밑에서 설명하였다.

   o MSK Connect ‘Autoscaled’ ‘Provisioned’ 두가지 방식의 스케일 아웃을 지원한다.

   o IAM에서 사전에 생성한 ‘msk-connector-s3-role’ 선택한다.

   o MSK Broker Connect worker 간간 통신은 Plaintext 한다.

   o Connect worker 로그를 CloudWatch 전송한다.

   o 정상적으로 실행되면 Connector Status Running으로 변경되고, 에러가 발생되면 실행이 중단된다. Connector 생성 과정에서 에러가 발생되면 삭제하고 처음부터 다시 해야 한다.

- Confluent S3 Sink Connector worker 로그 확인

- Connector configuration 설명

   o 설정한 Connector configuration 하나의 예시이며,적용할 시스템의 상황에 맞게 구성해야한다.

      ▷ 자세한 내용은 MongoDB사의 메뉴얼을 참고하기 바란다.

          https://docs.confluent.io/kafka-connect-s3-sink/current/configuration_options.html

connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=2
topics=msk-connect-topic
flush.size=5
rotate.schedule.interval.ms=60000
s3.region=us-east-1
s3.bucket.name=785122731662-msk-connect-sink
format.class=io.confluent.connect.s3.format.avro.AvroFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
value.converter.schemas.enable=false
schema.compatibility=NONE
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
partition.duration.ms=60000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=ko_KR
timezone=Asia/Seoul

      ▷ tasks.max 하나의 Connector worker에서 동시 실행되는 task 수이다.

      ▷ topiics 구독할 MSK cluster topic명이다.

      ▷ flush.size S3 데이터를 저장할 배치 단위이며, rotate.schedule.interval.ms 저장할 시간 단위이다.

      ▷ s3.region s3.bucket.name 데이터가 저장될 S3 정보이다.

      ▷ Partition.class, partition.duration.ms, path.format, local, timezone S3 저장시 사용될 파티션 설정이다.

 

F. MSK Connect w/ MongoDB Connector 구성

- MSK Custom plugin 생성

   o MongoDB Connector https://www.confluent.io/hub/mongodb/kafka-connect-mongodb에서 PC 다운로드한다.

   o 다운로드한 파일을 S3 업로드한다.

   o ‘Amazon MSK > Custom plugins > Create custom plugin’ 선택후 Connector 등록한다.

- MongoDB Connector 생성

   o Connecotor 생성이 완료되면 동시에 Connector worker 실행된다.

   o ‘Amazon MSK > Connector > Create connector’ 선택한다.

   o 이전 단계에서 생성한 plugin 선택한다.

   o Connector name ‘mongodb-connector-msk-connector-topic’ 입력한다. 접속할 MSK cluster 선택하고 Authentication ‘None’ 선택한다.

   o Connector configuration 다음과 같이 입력한다. 입력한 configuration 대하여 밑에서 설명하였다.

   o MSK Connect ‘Autoscaled’ ‘Provisioned’ 두가지 방식의 스케일 아웃을 지원한다.

   o IAM에서 Policy 없는 임의의 Role 생성 선택한다.

   o MSK Broker Connect worker 간간 통신은 Plaintext 한다.

   o Connect worker 로그를 CloudWatch 전송한다.

   o 정상적으로 실행되면 Connector Status Running으로 변경되고, 에러가 발생되면 실행이 중단된다. Connector 생성 과정에서 에러가 발생되면 삭제하고 처음부터 다시 해야 한다.

- Confluent S3 Sink Connector worker 로그 확인

- MongoDB connect IAM Role

   o Connector 등록 과정에서 사용한 ‘msk-connector-role’ Role Permission 없고 ‘Trust relationships’ 있다

- Connector configuration 설명

   o 설정한 Connector configuration 하나의 예시이며,적용할 시스템의 상황에 맞게 구성해야한다.

   o 자세한 내용은 MongoDB사의 메뉴얼을 참고하기 바란다.

       https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/#std-label-kafka-sink-configuration-properties

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=2
topics=msk-connect-topic
connection.uri=mongodb://mongoadmin:pass1004@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false
database=msk-connect-db
collection=msk_connect_sink
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
mongo.errors.tolerance=all
errors.deadletterqueue.topic.name=msk-connect-dlq-topic

       ▷ tasks.max 하나의 Connector worker에서 동시 실행되는 task 수이다.

       ▷ topiics 구독할 MSK cluster topic명이다.

       ▷ connection.uri MongoDB 접속 정보로 신규 구성된 endpoint 변경해야한다.

       ▷ database collection MongoDB 관련된 파라미터이다. collection명에는 ‘-’ 사용을 권고하지 않는다.

       ▷ key.converter value.converter StringConverter 설정하였는데, topic 데이터 포맷으로 Avro Text(JSON) 둘다 지원한다.

       ▷ mongo.errors.tolerance 데이터 처리 에러가 발생될 경우 task 중단하지 않고 계속 실행하게 하는 파라미터이며, 이때 데이터를 deadletterqueue topic으로 전달하여 추후 원인 분석 재처리 하도록 한다.

           https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/error-handling/#dead-letter-queue-configuration-example

 

G. Kafka Connector 기능 확인

- MSK topic 데이터를 퍼블리시

   o 위에서 생성한 EC2 ‘Connect > EC2 Instance Connect’ 선택하여 접속한다.

   o kafka-console-producer.sh 명령어로 AVRO 형식의 데이터를 퍼블리시한다.

[ec2-user@ip-172-31-17-43 ~]$ cd ./kafka_2.12-2.2.1/bin
[ec2-user@ip-172-31-17-43 bin]$
ec2-user@ip-172-31-17-43 bin]$ echo '{"username": "ys", "age": 52}' | ~/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --broker-list b-2.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-1.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092,b-3.mskconnectcluster.fx1uo9.c22.kafka.us-east-1.amazonaws.com:9092 \
   --topic msk-connect-topic \
   --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"username","type":"string"},{"name":"age","type":"int"}]}'
[ec2-user@ip-172-31-17-43 bin]$

- S3 결과 확인 - Confluent S3 Sink Connector

- DocumentDB 결과 확인 - MongoDB connector

   o mongosh 명령어로 DocumentDB 접속하여 정상적으로 데이터가 입력 되었는지 확인한다

[ec2-user@ip-172-31-17-43 ~]$ mongosh "mongodb://mongoadmin:pass1004@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/msk-connect-db?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false"
Current Mongosh Log ID: 62e8807555854009d05f20a4
Connecting to:          mongodb://<credentials>@docdb-msk-cluster.cluster-cwn9sf0dhwmi.us-east-1.docdb.amazonaws.com/msk-connect-db?replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false&appName=mongosh+1.5.4
Using MongoDB:          4.0.0
Using Mongosh:          1.5.4

For mongosh info see: https://docs.mongodb.com/mongodb-shell/

rs0 [primary] msk-connect-db> db.msk_connect_sink.find({})
[
  {
    _id: ObjectId("62e8877584ff9504c2a8c61a"),
    username: 'ys',
    age: 52
  }
]rs0 [primary] msk-connect-db>

 

4. 제약 사항

- MSK Connect에서 확장성을 제공하지만, MSK Connector worker(e.g. Mongo Connector) 모든 Task 에러로 중단될 경우 수동으로 MSK Connector 삭제하고 다시 생성해야 한다.

- MSK Connector 설정된 Configuration 포함한 설정내용들은 변경할 없으며, MSK Connector 삭제하고 다시 생성해야 한다.

- MSK Connector 중지하거나 시작하는 기능은 현재 제공되지 않는다.

 

5. 대안 아키텍처

- Kinesis Data Streams Kinesis Data Firehose 활용하면 MSK Connect 제약 사항을 해소할 있다.

- Kinesis Data Streams 조정 가능하고 내구성이 뛰어난 실시간 데이터 스트리밍 서비스이다.

- Kinesis Data Firehose 데이터 스트림을 캡처 변환하여 AWS 데이터 스토어로 로드할 있는 서비스이다.

 

참조 문서

- MSK Connect

   o https://catalog.us-east-1.prod.workshops.aws/workshops/c2b72b6f-666b-4596-b8bc-bafa5dcca741/en-US/mskconnect

- Confluent S3 Sink Connector

   o https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html

   o https://docs.confluent.io/kafka-connect-s3-sink/current/configuration_options.html

- MongoDB Connector

   o https://www.mongodb.com/developer/languages/java/integrating-mongodb-amazon-apache-kafka/

   o https://www.mongodb.com/docs/kafka-connector/current/

   o https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/#std-label-kafka-sink-configuration-properties

- Lenses Connector (IAM Role 참조)

  o https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-getting-started.html

  o https://awstip.com/use-amazon-msk-connect-with-lenses-plugin-to-sink-data-from-amazon-msk-to-amazon-s3-8d2af3f37004

댓글