StarQuest Technical Documents

Using SQDR and Amazon Managed Streaming for Kafka

Last Update: 29 November 2021
Product: StarQuest Data Replicator
Version: SQDR 5.18 or later
Article ID: SQV00DR043

Abstract

This technical document provides detailed platform-specific information for using the Kafka support of SQDR with Amazon Managed Streaming for Apache Kafka (MSK).

See the technical document Using the SQDR Kafka Producer for general information.

Amazon Managed Streaming for Apache Kafka (MSK) can be used in place of a Kafka cluster, either as a message broker between the SQDR Producer and a consumer, or as a method of storing data into a destination such as Amazon Redshift or Amazon S3 object store (using AWS Glue or using Kafka-Kinesis-Connector and Amazon Kinesis Data Firehose), or supplying data to an analysis tool such as Amazon Kinesis Data Analytics.

Solution

Using MSK as a Message Broker

Finding the Connection String

After creating your MSK Cluster, select the View client information and record the host and port connection information for the Bootstrap servers.

Configuring Access

To access your Amazon MSK cluster from a client in the same Amazon VPC as the cluster, make sure the cluster's security group has an inbound rule that accepts traffic from the client's security group. To access your MSK cluster from a VPC that's different from the cluster's VPC, you can create a peering connection between the two VPC's.

At the time of creating this article (June 2020), MSK was accessible from systems external to AWS only through the use of VPN Connections, AWS Direct Connect, or AWS Transit Gateway. As of November 2021, AWS MSK now gives you the option to enable public access - see the topic Public access in the MSK documentation. However, Plaintext traffic between brokers and clients must be off and unauthenticated access control must be off; using TLS and access-control methods may involve certificate management and more complicated configuration.

Creating a Topic

The cluster can be configured with auto.create.topics.enable=true. However, if you experience problems, create the topic in advance:

C:\bin\kafka_2.12-2.5.0\bin\windows> kafka-topics --bootstrap-server mysys.amazonaws.com:9092 --create --topic testtopic --partitions 2 --replication-factor 1

For more information see

Getting Started Using Amazon MSK (Amazon documentation)

Getting started with AWS MSK (blog posting)


Using AWS Glue with Managed Streaming for Apache Kafka (Streaming to S3)

The following is based on this blog posting:
Serverless Streaming ETL with AWS Glue

This example targets S3. Streaming data to Redshift is similar.

Initial Setup - Create topic, configure Security Group, & test message broker function

First create your Amazon MSK cluster and verify that you can use it as a message broker between the SQDR Producer and a consumer.

For initial testing, we recommend setting up an AWS VM running SQDR, the producer, and a test consumer on a Windows EC2 instance located on the same VPC & subnet as your MSK cluster and adding the following to the Security Group, allowing Kafka traffic from your EC2 instance, as well as allowing ingress traffic from the Security Group itself to allow traffic from MSK to Glue to S3.

Inbound rules
Custom TCP TCP 9092 - 9094 0.0.0.0/0
All TCP TCP 0 - 65535 sg-b04e87d6 (default)

Create the topic:

C:\bin\kafka_2.12-2.5.0\bin\windows> kafka-topics --bootstrap-server mysys.amazonaws.com:9092 --create --topic testtopic --partitions 2 --replication-factor 1

Create an IAM (Identity and Access Management) Role for AWS Glue

  1. Open the IAM console at https://console.aws.amazon.com/iam/
  2. In the left navigation pane, choose Roles.
  3. Choose Create role.
  4. For role type, choose AWS Service, find and choose Glue, and choose Next: Permissions.
  5. On the Attach permissions policy page, choose the policies that contain the required permissions. For example, use the AWS managed policy AWSGlueServiceRole for general AWS Glue permissions and the AWS managed policy AmazonS3FullAccess for access to Amazon S3 resources. Then choose Next: Review.
  6. For Role name, enter a name for your role; for example, AWSGlueServiceRoleDefault. Create the role with the name prefixed with the string AWSGlueServiceRole to allow the role to be passed from console users to the service. AWS Glue provided policies expect IAM service roles to begin with AWSGlueServiceRole.

Create an S3 Endpoint in your VPC

See https://docs.aws.amazon.com/glue/latest/dg/vpc-endpoints-s3.html

  1. Open https://console.aws.amazon.com/vpc/
  2. In the left navigation pane, choose Endpoints.
  3. Choose Create Endpoint, and follow the steps to create an Amazon S3 endpoint in your VPC. e.g. service name com.amazonaws.us-east-1.s3
  4. Add your subnets to the route table.

Create a Glue Connection

Go to AWS Glue in the AWS Console and create a Glue connection.

  1. Name the Glue connection e.g. msk1
  2. Connection Type=Kafka
  3. For Kafka bootstrap server URLs, specify the TLS connection - this will be using port 9094. e.g. b-2.mysys.amazonaws.com:9094,b-1.mysys.amazonaws.com:9094
  4. Select the VPC and Subnet from the dropdown lists.
  5. Specify the Security group.

Create a Database and Tables in AWS Glue

Databases/Add database
e.g. test

Databases/Add tables (manually)
Table name testtab
Database test
Source Kafka
Topic name testtopic
Connection msk1
Classification JSON

Add columns
Add the following column names (all of Data type string)

group
txid
seq
operation
beforekey
dest
row

Create S3 Bucket to Act as a Target

  1. Create an S3 target bucket e.g. aws-glue-target-us-east-1
  2. On the permissions screen, allow public access, or use an ACL (Access Control List)

Create & run a Glue Job

  1. Create a Glue Job
    Name MSK1-S3
    IAM role AWSGlueServiceRoleDefault.
    Type Spark Streaming
  2. Note the locations of
    S3 path where the script is stored
    S3 path for temporary space
  3. Hit Next.
  4. Choose a data source: testtab
  5. Choose whether to
    Create tables in your data target
    Use tables in the data catalog and update your data target
  6. Choose target path (the S3 target you created)
  7. Map the source columns to target columns.
  8. Save the job and run it. It may take up to 10 minutes to start, and it should continue running until you choose to stop it.

When the Kafka producer runs, your Kafka consumer will receive data. In addition, an object will be created in the S3 bucket in a folder structure showing year, month, date, hour:

e.g.
ingest_year=2020/ingest_month=07/ingest_day=06/ingest_hour=19/run-1594064703389-part-r-00000

Download this object and examine it to confirm that it contains your data.

If necessary, refresh the S3 contents listing with the refresh icon (two circular arrows, next to the Region name)

Troubleshooting

If the job fails to run, hover over the Error field in Job History (it just appears as a red exclamation and the text “J...”)

Error:
failed to execute with exception At least one security group must open all ingress ports.To limit traffic, the source security group in your inbound rule can be restricted to the same security group

Solution: Edit the Security Group to allow ingress traffic from the Security Group itself.

Error:
failed to execute with exception VPC S3 endpoint validation failed for SubnetId: subnet-eaffd7c1. VPC: vpc-43c76727. Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-eaffd7c1 in Vpc vpc-43c76727

Solution: Add an endpoint for S3.

 


DISCLAIMER

The information in technical documents comes without any warranty or applicability for a specific purpose. The author(s) or distributor(s) will not accept responsibility for any damage incurred directly or indirectly through use of the information contained in these documents. The instructions may need to be modified to be appropriate for the hardware and software that has been installed and configured within a particular organization.  The information in technical documents should be considered only as an example and may include information from various sources, including IBM, Microsoft, and other organizations.