Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10 from ProntoPro/develop
Browse files Browse the repository at this point in the history
Release 0.1.0
  • Loading branch information
zuc authored Mar 19, 2019
2 parents fb9b738 + 697b325 commit 30b0e60
Show file tree
Hide file tree
Showing 11 changed files with 744 additions and 121 deletions.
8 changes: 5 additions & 3 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ pipeline:
image: python:3.7.1-alpine
commands:
- pip3 install --upgrade pip setuptools wheel
- pip3 install pylint
- pip3 install astroid==2.1.0
- pip3 install pylint==2.2.2
- pip3 install -e .
- pylint -E target_kinesis/*.py
when:
Expand All @@ -13,10 +14,11 @@ pipeline:
test:
image: python:3.7.1-alpine
commands:
- apk add --no-cache libffi-dev openssl-dev build-base
- pip3 install --upgrade pip setuptools wheel
- pip3 install pytest
- pip3 install pytest==4.2.0 pytest-cov==2.6.1 pytest-mock==1.10.1 moto==1.3.7
- pip3 install -e .
- pytest -p no:warnings
- pytest -p no:warnings --cov=target_kinesis tests/
when:
event:
- push
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ Here is an example of required configuration:
"aws_access_key_id": "YOUR_AWS_ACCESS_KEY_ID",
"aws_secret_access_key": "YOUR_AWS_SECRET_ACCESS_KEY",
"region": "YOUR_AWS_REGION",
"is_firehose": true
"is_firehose": true,
"record_chunks": 10,
"data_chunks": 1000
}
```

Expand All @@ -50,3 +52,9 @@ To run `target-kinesis` with the configuration file, use this command:
[Singer Tap]: https://singer.io
[Mac]: http://docs.python-guide.org/en/latest/starting/install3/osx/
[Ubuntu]: https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04

### Run test suite

```
pytest -p no:warnings --cov=target_kinesis tests/test_target.py --cov-report=html
```
4 changes: 3 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"aws_access_key_id": "YOUR_AWS_ACCESS_KEY_ID",
"aws_secret_access_key": "YOUR_AWS_SECRET_ACCESS_KEY",
"region": "YOUR_AWS_REGION",
"is_firehose": true
"is_firehose": true,
"record_chunks": 10,
"data_chunks": 1000
}
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
extras_require={
'dev': [
'pylint==2.1.1',
'pytest==4.2.0'
'pytest==4.2.0',
'pytest-cov==2.6.1',
'pytest-mock==1.10.1',
'moto==1.3.7'
]
},
entry_points="""
Expand Down
19 changes: 1 addition & 18 deletions target_kinesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,7 @@
import json
import io

from .target import *

def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()

if args.config:
with open(args.config) as input:
config = json.load(input)
else:
config = {}

input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_lines(config, input)

emit_state(state)
logger.debug("Exiting normally")
from .target import main


if __name__ == '__main__':
Expand Down
51 changes: 28 additions & 23 deletions target_kinesis/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,32 @@
import json
import singer

logger = singer.get_logger()

def deliver(config, record):
stream_name = config.get("stream_name")
aws_access_key_id = config.get("aws_access_key_id")
aws_secret_access_key = config.get("aws_secret_access_key")
region_name = config.get("region_name", "eu-west-2")

client = boto3.client(
'firehose',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)

response = client.put_record(
DeliveryStreamName=stream_name,
Record={
'Data': json.dumps(record) + "\n"
}
)

logger.info(response)

def firehose_setup_client(config):
aws_access_key_id = config.get("aws_access_key_id")
aws_secret_access_key = config.get("aws_secret_access_key")
region_name = config.get("region_name", "eu-west-2")
return boto3.client(
'firehose',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)


def firehose_deliver(client, stream_name, records):

if len(records) == 0:
raise Exception("Record list is empty")

if isinstance(records, dict):
raise Exception("Single record given, array is required")

encoded_records = map(lambda x: json.dumps(x), records)
payload = ("\n".join(encoded_records) + "\n")

response = client.put_record(
DeliveryStreamName=stream_name,
Record={'Data': payload}
)
return response
51 changes: 32 additions & 19 deletions target_kinesis/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
import boto3
import json
import singer

def deliver(config, record):
stream_name = config.get("stream_name")
partition_key = config.get("partition_key", "id")
aws_access_key_id = config.get("aws_access_key_id")
aws_secret_access_key = config.get("aws_secret_access_key")
region_name = config.get("region_name")

client = boto3.client(
'kinesis',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)

response = client.put_record(
StreamName=stream_name,
Data=json.dumps(record).encode(),
PartitionKey=record[partition_key]
)

def kinesis_setup_client(config):
aws_access_key_id = config.get("aws_access_key_id")
aws_secret_access_key = config.get("aws_secret_access_key")
region_name = config.get("region_name", "eu-west-2")

return boto3.client(
'kinesis',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)


def kinesis_deliver(client, stream_name, partition_key, records):

if len(records) == 0:
raise Exception("Record list is empty")

if isinstance(records, dict):
raise Exception("Single record given, array is required")

encoded_records = map(lambda x: json.dumps(x), records)
payload = ("\n".join(encoded_records) + "\n")

response = client.put_record(
StreamName=stream_name,
Data=payload.encode(),
PartitionKey=records[0][partition_key]
)
return response
Loading

0 comments on commit 30b0e60

Please sign in to comment.