This library allows your agent code to work with Amazon Web Services’ Kinesis Streams. It makes use of the Kinesis Streams REST API.
This version of the library supports the following functionality:
- Writing data records into an Amazon Kinesis stream.
- Getting data records from an Amazon Kinesis stream’s shard.
The AWSKinesisStreams library utilizes the AWSRequestV4 library.
To add this library to your project, add the following lines to the top of your agent code:
#require "AWSRequestV4.class.nut:1.0.2"
#require "AWSKinesisStreams.agent.lib.nut:1.1.0"
A complete, step-by-step recipe can be found in the Examples folder.
The library consists of two essentially independent parts for, respectively, reading and writing data. You can instantiate and use any of these parts in your agent code as required by your application. The library includes some common components which are used by the both of the main parts.
Before using the library you need to have:
- The Region Code of Amazon EC2 (see the Amazon EC2 documentation).
- The Access Key ID of an AWS IAM user (see the Kinesis Streams documentation).
- The Secret Access Key of an AWS IAM user (see the Kinesis Streams documentation).
You also need to understand all the main Amazon Kinesis Streams concepts and terms, like stream, shard, record, etc., and the name of the Kinesis stream which your application is going to work with.
All requests that are made to the AWSKinesisStreams library occur asynchronously. Every method that sends a request has a parameter which takes a callback function that will be called when the operation is completed, whether successfully or not. The callback’s parameters are listed in the corresponding method documentation, but every callback has at least one parameter, error. If error is null
, the operation has been executed successfully. Otherwise, error is an instance of the AWSKinesisStreams.Error class and contains the details of the error.
This class represents an error returned by the library. As such it will be generated for you. It has the following public properties:
- type — The error type, which is one of the following AWS_KINESIS_STREAMS_ERROR enum values:
- AWS_KINESIS_STREAMS_ERROR.LIBRARY_ERROR — The library is wrongly initialized, or a method is called with invalid argument(s), or an internal error. The error details can be found in the details property. Usually it indicates an issue during an application development which should be fixed during debugging and therefore should not occur after the application has been deployed.
- AWS_KINESIS_STREAMS_ERROR.REQUEST_FAILED — HTTP request to Amazon Kinesis Streams fails. The error details can be found in the details, httpStatus and httpResponse properties. This error may occur during the normal execution of an application. The application logic should process this error.
- AWS_KINESIS_STREAMS_ERROR.UNEXPECTED_RESPONSE — An unexpected response from Amazon Kinesis Streams. The error details can be found in the details and httpResponse properties.
- details — A string with human readable details of the error.
- httpStatus — An integer indicating the HTTP status code, or
null
if type is AWS_KINESIS_STREAMS_ERROR.LIBRARY_ERROR. - httpResponse — A table of key-value strings holding the response body of the failed request, or
null
if type is AWS_KINESIS_STREAMS_ERROR.LIBRARY_ERROR.
This is the parent class for AWSKinesisStreams.Producer and AWSKinesisStreams.Consumer. You will not work with this class but with instances of its child classes, all of which respond to the following method:
This method enables (value is true
) or disables (value is false
) the library debug output (including error logging). It is disabled by default. The method returns nothing.
This class represents an AWS Kinesis Streams record: a combination of data attributes. It has the following public properties:
Property | Data Type | Description |
---|---|---|
data | Blob or JSON-compatible type | The record data |
partitionKey | String | Identifies which shard in the stream the data record is assigned to (see the Kinesis Streams documentation) |
sequenceNumber | String | The unique identifier of the record within its shard (see the Kinesis Streams documentation) |
timestamp | Integer | The approximate time that the record was inserted into the stream. In number of seconds since Unix epoch (midnight, 1 Jan 1970) |
encryptionType | AWS_KINESIS_STREAMS_ENCRYPTION_TYPE | The encryption type used on the record |
Constructor: AWSKinesisStreams.Record(data, partitionKey[, explicitHashKey][, prevSequenceNumber][, encoder])
This method creates and returns an AWSKinesisStreams.Record object that can be written into an Amazon Kinesis stream using AWSKinesisStreams.Producer methods.
Parameter | Data Type | Required | Description |
---|---|---|---|
data | Blob or JSON-compatible type | Yes | The record data |
partitionKey | String | Yes | Identifies which shard in the stream the data record is assigned to (see the Kinesis Streams documentation) |
explicitHashKey | String | No | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash (see the Kinesis Streams documentation) |
prevSequenceNumber | String | No | See the Kinesis Streams documentation |
encoder | Function | No | A custom JSON encoder function for encoding the provided data (eg. JSONEncoder.encode()) |
The encryption type used on a record. It has the following values:
- AWS_KINESIS_STREAMS_ENCRYPTION_TYPE.NONE — Record is not encrypted.
- AWS_KINESIS_STREAMS_ENCRYPTION_TYPE.KMS — Record is encrypted on server side using a customer-managed KMS key.
For more information, please see the Kinesis Streams documentation.
This class allows the agent to write data records to a specific AWS Kinesis stream. One instance of this class writes data to one stream. The stream’s name as well as the region and the user identification are specified in the class constructor.
Creates and returns an AWSKinesisStreams.Producer object. The constructor’s parameters are as follows:
Parameter | Data Type | Required | Description |
---|---|---|---|
region | String | Yes | The Region code of Amazon EC2 (see the Amazon EC2 documentation.) |
accessKeyId | String | Yes | The access key ID of an AWS IAM user. See the Kinesis Streams documentation) |
secretAccessKey | String | Yes | The secret access key of an AWS IAM user (see the Kinesis Streams documentation) |
streamName | String | Yes | The name of the Amazon Kinesis stream |
This method writes a single data record into the AWS Kinesis stream. For more information, please see the corresponding Kinesis Streams REST API action.
Parameter | Data Type | Required | Description |
---|---|---|---|
record | AWSKinesisStreams.Record | Yes | The record to be written |
callback | Function | No | Executed once the operation is completed |
The method returns nothing. The result of the operation may be obtained via the callback function, which has the following parameters:
Parameter | Data Type | Description |
---|---|---|
error | AWSKinesisStreams.Error | Error details, or null if the operation succeeded |
putRecordResult | AWSKinesisStreams.PutRecordResult | The information from AWS Kinesis Streams about the written data record, or null if the operation failed |
This method writes multiple data records into the AWS Kinesis stream in a single request. Every record is processed by AWS individually. Some of the records may be written successfully but some may fail. For more information, please see the corresponding Kinesis Streams REST API action.
Parameter | Data Type | Required | Description |
---|---|---|---|
records | Array of AWSKinesisStreams.Records | Yes | The records to be written |
callback | Function | No | Executed once the operation is completed |
The method returns nothing. The result of the operation may be obtained via the callback function, which has the following parameters:
Parameter | Data Type | Description |
---|---|---|
error | AWSKinesisStreams.Error | Error details, or null if the operation succeeds or partially succeeds |
failedRecordCount | Integer | The number of unsuccessfully written records |
putRecordResults | Array of AWSKinesisStreams.PutRecordResult | Array with the information from AWS Kinesis Streams about every processed data record, whether it is written successfully or not. Each record in the array directly correlates with a record in the records array using natural ordering, from the top to the bottom of the records and putRecordResults. If error is not null then putRecordResults is empty, otherwise the putRecordResults array includes the same number of records as the records array |
This class represents information from AWS Kinesis Streams about a written data record. It has the following public properties:
Property | Data Type | Description |
---|---|---|
errorCode | String | The error code for the data record, or null if the record is written successfully (see the Kinesis Streams documentation) |
errorMessage | String | The error message for the data record, or null if the record is written successfully (see the Kinesis Streams documentation) |
shardId | String | The ID of the shard where the data record has been written, or null if the write failed |
sequenceNumber | String | The unique identifier of the record within its shard, or null if the write failed |
encryptionType | AWS_KINESIS_STREAMS_ENCRYPTION_TYPE | The encryption type used on the record, or null if the write failed |
This class allows your code to read data records from a specific AWS Kinesis Stream.
This method creates and returns an AWSKinesisStreams.Consumer object. The constructor’s parameters are as follows:
Parameter | Data Type | Required | Description |
---|---|---|---|
region | String | Yes | The Region code of Amazon EC2 (see the EC2 documentation) |
accessKeyId | String | Yes | The access key ID of an AWS IAM user (see the Kinesis Streams documentation) |
secretAccessKey | String | Yes | The secret access key of an AWS IAM user (see the Kinesis Streams documentation) |
streamName | String | Yes | The name of an AWS Kinesis stream |
isBlob | Boolean | No | If true , the AWSKinesisStreams.Consumer object will consider every received data record as a Squirrel blob. If false or not specified, the AWSKinesisStreams.Consumer object will consider every received data record as a JSON data and parse it into an appropriate JSON-compatible type |
Before creating an AWSKinesisStreams.Consumer instance your code should know which type of data it is going to receive: binary data (a Squirrel blob) or a JSON-compatible type. This choice is specified in the AWSKinesisStreams.Consumer constructor and cannot be changed after that. In a complex case, your application can specify the data as a blob and parse it to a specific or custom type by itself.
This method retrieves a list of the IDs of all the shards in the AWS Kinesis stream, including closed shards. Closed shards may still contain records your application may need to read.
Parameter | Data Type | Required | Description |
---|---|---|---|
callback | Function | Yes | Executed once the operation is completed |
The method returns nothing. The result of the operation may be obtained via the callback function, which has the following parameters:
Parameter | Data Type | Description |
---|---|---|
error | AWSKinesisStreams.Error | Error details, or null if the operation succeeded |
shardIds | Array of strings | The IDs of the stream’s shards. The array is empty if the operation failed |
This method allows your code to specify a start position from which the reading should be started and to obtain the corresponding shard iterator to initiate the reading process from the shard. For more information, please see the corresponding Kinesis Streams REST API action.
Note Every shard iterator returned by getShardIterator() or getRecords() expires five minutes after it is returned. Your application should call getRecords() with the iterator before it expires, otherwise the call will fail and your code will need to obtain a new iterator using getShardIterator().
Parameter | Data Type | Required | Description |
---|---|---|---|
shardId | String | Yes | The shard ID. |
type | AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE | Yes | The shard iterator type. Determines how the shard iterator is used to start reading data records from the shard. Some of the types require the corresponding typeOptions to be specified |
typeOptions | Table | Yes | Additional options required for some of the shard iterator types specified by the type parameter (see below). Pass null if the additional options are not required for the specified iterator type |
callback | Function | Yes | Executed once the operation is completed |
typeOptions Key | Data Type | Description |
---|---|---|
startingSequenceNumber | String | The sequence number of the data record in the shard from which to start reading. Must be specified if the type parameter is AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AT_SEQUENCE_NUMBER or AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AFTER_SEQUENCE_NUMBER |
timestamp | Integer | The timestamp of the data record from which to start reading. In number of seconds since Unix epoch (midnight, 1 Jan 1970). Must be specified if the type parameter is AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AT_TIMESTAMP (see the Kinesis Streams documentation for the behavior details) |
The method returns nothing. The result of the operation may be obtained via the callback function, which has the following parameters:
Parameter | Data Type | Description |
---|---|---|
error | AWSKinesisStreams.Error | Error details, or null if the operation succeeded |
shardIterator | String | The shard iterator, or null if the operation failed |
This method allows your code to read a portion of data records using the specified shard iterator and returns the next shard iterator which can be used to read the next portion of data records by calling getRecords() again. Reading is always going to prefer older records over the latest. For more information, please see the corresponding Kinesis Streams REST API action.
Parameter | Data Type | Required | Description |
---|---|---|---|
options | Table | Yes | Options for the operation (see below) |
callback | Function | Yes | Executed once the operation is completed |
options key | Data Type | Required | Description |
---|---|---|---|
shardIterator | String | Yes | The shard iterator that specifies the position in the shard from which the reading should be started |
limit | Integer | Optional | The maximum number of data records to read. If not specified, the number of returned records is AWS Kinesis Streams specific (see the Kinesis Streams documentation) |
The method returns nothing. The result of the operation may be obtained via the callback function, which has the following parameters:
Parameter | Data Type | Description |
---|---|---|
error | AWSKinesisStreams.Error | Error details, or null if the operation succeeded |
records | Array of AWSKinesisStreams.Record | The data records retrieved from the shard. The array is empty if the operation failed or there are no new records in the shard for the specified shard iterator |
millisBehindLatest | Integer | The number of milliseconds the response is from the tip of the stream. Zero if there are no new records in the shard for the specified shard iterator (see the Kinesis Streams documentation) |
nextOptions | Table | Options which can be used as the options parameter in the next getRecords() call. nextOptions is null if the operation failed, or the shard has been closed and the specified shard iterator has reached the last record in the shard and will not return any more data |
nextOptions key | Data Type | Description |
---|---|---|
shardIterator | String | The new shard iterator returned by AWS Kinesis Streams. Can be used as the shard iterator in the next getRecords() call |
limit | Integer | The maximum number of data records to read. The same value as in the options table. Will not be present if it was not included in the options table |
If your application needs to read all records from the stream it should read them from all the shards in the stream. The library allows you to obtain shard iterators for different shards of the same stream and process the reading from the shards in parallel. The list of shards is changed when the shards are merged or split. The application can get the latest list of shards by calling getShards() periodically, but it should be sufficient to make this check only when getRecords() returns a nextOptions of null
for any shard. A shard ID never disappears from the list, but new IDs may appear.
Note Every shard iterator returned by getShardIterator() or getRecords() expires five minutes after it is returned. Your application should call getRecords() with the iterator before it expires, otherwise the call will fail and your code will need to obtain a new iterator using getShardIterator().
The shard iterator type. It determines how the shard iterator is used to start reading data records from the shard. It has the following values:
- AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AT_SEQUENCE_NUMBER — Start reading from the position denoted by a specific record sequence number.
- AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AFTER_SEQUENCE_NUMBER — Start reading right after the position denoted by a specific record sequence number.
- AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.AT_TIMESTAMP — Start reading from the position denoted by a specific timestamp.
- AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.TRIM_HORIZON — Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.
- AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.LATEST — Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
For more information, please see the Kinesis Streams documentation.
A type of Squirrel data which can be encoded/decoded into/from JSON, eg. table, array, string, boolean, integer, float. For more details, please see the http.jsonencode() and http.jsondecode() method descriptions.
#require "AWSRequestV4.class.nut:1.0.2"
#require "AWSKinesisStreams.agent.lib.nut:1.1.0"
#require "JSONEncoder.class.nut:2.0.0"
// This class can be used to hold numbers larger than Squirrel can natively support (ie. anything larger than 32-bit)
// and then be encoded as a number (rather than a string) when encoded with 'JSONEncoder.encode()'.
class JSONLiteralString {
_string = null;
constructor (string) {
_string = string.tostring();
}
function _serializeRaw() {
return _string;
}
function toString() {
return _string;
}
}
// Substitute with real values
const AWS_KINESIS_REGION = "<YOUR_AWS_REGION>";
const AWS_KINESIS_ACCESS_KEY_ID = "<YOUR_AWS_ACCESS_KEY_ID>";
const AWS_KINESIS_SECRET_ACCESS_KEY = "<YOUR_AWS_SECRET_ACCESS_KEY>";
const AWS_KINESIS_STREAM_NAME = "<YOUR_KINESIS_STREAM_NAME>";
// Instantiation of AWS Kinesis Streams producer
producer <- AWSKinesisStreams.Producer(AWS_KINESIS_REGION, AWS_KINESIS_ACCESS_KEY_ID, AWS_KINESIS_SECRET_ACCESS_KEY, AWS_KINESIS_STREAM_NAME);
// Writes single data record
producer.putRecord(AWSKinesisStreams.Record("Hello!", "partitionKey"), function (error, putResult) {
if (error) {
server.error("Data writing failed: " + error.details);
} else {
// Record written successfully
}
});
// Writes multiple records with different data structures
records <- [
AWSKinesisStreams.Record("test", "partitionKey1"),
AWSKinesisStreams.Record(12345, "partitionKey2"),
AWSKinesisStreams.Record({ "temperature" : 21, "humidity" : 60 }, "partitionKey3"),
// Write record using custom encoder
AWSKinesisStreams.Record({ "a" : JSONLiteralString("123456789123456789") }, "partitionKey4", null, null, JSONEncoder.encode.bindenv(JSONEncoder))
];
producer.putRecords(records, function (error, failedRecordCount, putResults) {
if (error) {
server.error("Data writing failed: " + error.details);
} else if (failedRecordCount > 0) {
server.log("Data writing partially failed:");
foreach (res in putResults) {
if (res.errorCode) {
server.log(format("%s: %s", res.errorCode, res.errorMessage));
}
}
} else {
// Records written successfully
}
});
#require "AWSRequestV4.class.nut:1.0.2"
#require "AWSKinesisStreams.agent.lib.nut:1.1.0"
// Substitute with real values
const AWS_KINESIS_REGION = "<YOUR_AWS_REGION>";
const AWS_KINESIS_ACCESS_KEY_ID = "<YOUR_AWS_ACCESS_KEY_ID>";
const AWS_KINESIS_SECRET_ACCESS_KEY = "<YOUR_AWS_SECRET_ACCESS_KEY>";
const AWS_KINESIS_STREAM_NAME = "<YOUR_KINESIS_STREAM_NAME>";
// Instantiation of AWS Kinesis Streams consumer
consumer <- AWSKinesisStreams.Consumer(AWS_KINESIS_REGION, AWS_KINESIS_ACCESS_KEY_ID, AWS_KINESIS_SECRET_ACCESS_KEY, AWS_KINESIS_STREAM_NAME);
// Obtains the stream shards
consumer.getShards(function (error, shardIds) {
if (error) {
server.error("getShards failed: " + error.details);
} else {
foreach (shardId in shardIds) {
getShardIterator(shardId);
}
}
});
// Obtains shard iterator for the specified shard and starts reading records
function getShardIterator(shardId) {
consumer.getShardIterator(
shardId,
AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.TRIM_HORIZON,
null,
function (error, shardIterator) {
if (error) {
server.error("getShardIterator failed: " + error.details);
} else {
// shard iterator obtained successfully
readRecords({ "shardIterator" : shardIterator, "limit" : 10 });
}
}
);
}
// Recursively reads records from the specified shard
function readRecords(options) {
consumer.getRecords(
options,
function (error, records, millisBehindLatest, nextOptions) {
if (error) {
server.error("Data reading failed: " + error.details);
} else {
if (records.len() == 0) {
// No new records
} else {
foreach (record in records) {
// Process records individually
}
}
if (nextOptions) {
// Read next portion of records
imp.wakeup(10.0, function () {
readRecords(nextOptions);
});
}
}
}
);
}
Working examples are also provided in the Examples directory and described here.
This library is licensed under the MIT License.