AWS Kinesis producer testing
This guide describes how to setup AWS CLI in order to test if AWS Producer is pushing data successfully to the Kinesis stream
Install AWS CLI according to the guide if not installed
https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html
Configure it using
aws configure --profile test-graphyte
AWS Access Key ID [None]:
AWS Secret Access Key [None]:
Default region name [None]: eu-west-1
AWS kinesis uses Streams and Shards in order to manage data flows and scaling. It can be compared to Kafka Topics and Partitions:
Ask your Integration Manager for the relevant stream name and use it in subsequent calls. For example for the stream some-test-stream run
aws kinesis list-shards --stream-name some-test-stream --profile test-graphyte
Expected output:
{
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49614023790783601483728905336480125602121137113222086658"
}
}
]
}
In order to read data from kinesis you need to get shared iterator first, use shard id shardId-000000000000 from response above.
--shard-iterator-type can be:
- TRIM_HORIZON - beginning of the stream, but not older than 24h
- LATEST - last record of the stream.
- For more look at docs: https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-shard-iterator.html
Next step is to get shard iterator and get records from it:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name some-test-stream --query 'ShardIterator' --profile test-graphyte)
aws kinesis get-records --shard-iterator $SHARD_ITERATOR --profile test-graphyte
Response:
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAEROtFEiWoVdkmoa62xtMBG0LHYJVnQ24V5f2AgVR0dKTz021+VN2DZsDOM0zkKNra0ObvMV83s+e3xzxJ057eS+ESfxUtnd9megbDLlb7pJIP5r2NxsMgEr1LbAZRBozHt6HkNP0Gv94rSTVhoelmuvrCQpEBDqWNPwAbxh8MTbDZoh0HIH72Igz3ZVb3wkWAWuzCsY+YwaEq/q1OG1vXq38p76qCo9q+fL5u2zMIE0wLRxA9tCCmBuMRgUIFHXhk=",
"MillisBehindLatest": 85128000
}
"Records": [] - means that there is no data in the stream
Records are base64 encoded if not empty
In order to read next batch of records you need to use Iterator defined in NextShardIterator
Updated 6 months ago