The GlassFlow Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your GlassFlow pipelines.
You can install the GlassFlow Python SDK using pip:
pip install glassflow
- publish - Publish a new event into the pipeline
- consume - Consume the transformed event from the pipeline
- consume failed - Consume the events that failed from the pipeline
- validate credentials - Validate pipeline credentials
Publish a new event into the pipeline
from glassflow import PipelineDataSource
source = PipelineDataSource(pipeline_id="<str value", pipeline_access_token="<str token>")
data = {} # your json event
res = source.publish(request_body=data)
if res.status_code == 200:
print("Published sucessfully")
Consume the transformed event from the pipeline
from glassflow import PipelineDataSink
sink = PipelineDataSink(pipeline_id="<str value", pipeline_access_token="<str value>")
res = sink.consume()
if res.status_code == 200:
print(res.json())
If the transformation failed for any event, they are available in a failed queue. You can consume those events from the pipeline
from glassflow import PipelineDataSink
sink = PipelineDataSink(pipeline_id="<str value", pipeline_access_token="<str value>")
res = sink.consume_failed()
if res.status_code == 200:
print(res.json())
Validate pipeline credentials (pipeline_id
and pipeline_access_token
) from source or sink
from glassflow import PipelineDataSource, errors
try:
source = PipelineDataSource(pipeline_id="<str value", pipeline_access_token="<str value>")
source.validate_credentials()
except errors.PipelineNotFoundError as e:
print("Pipeline ID does not exist!")
raise e
except errors.PipelineAccessTokenInvalidError as e:
print("Pipeline Access Token is invalid!")
raise e
In order to manage your pipelines with this SDK, one needs to provide the PERSONAL_ACCESS_TOKEN
to the GlassFlow client.
from glassflow import GlassFlowClient
client = GlassFlowClient(personal_access_token="<your personal access token>")
Now you can perform CRUD operations on your pipelines:
- list_pipelines - Returns the list of pipelines available
- get_pipeline - Returns a pipeline object from a given pipeline ID
- create - Create a new pipeline
- delete - Delete an existing pipeline
Returns information about the available pipelines. It can be restricted to a
specific space by passing the space_id
.
from glassflow import GlassFlowClient
client = GlassFlowClient(personal_access_token="<your access token>")
res = client.list_pipelines()
Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object which can be used manage the Pipeline.
from glassflow import GlassFlowClient
client = GlassFlowClient(personal_access_token="<your access token>")
pipeline = client.get_pipeline(pipeline_id="<your pipeline id>")
print("Name:", pipeline.name)
The Pipeline object has a create method that creates a new GlassFlow pipeline.
from glassflow import Pipeline
pipeline = Pipeline(
name="<your pipeline name>",
transformation_file="path/to/transformation.py",
space_id="<your space id>",
personal_access_token="<your personal access token>"
).create()
In the next example we create a pipeline with Google PubSub source and a webhook sink:
from glassflow import Pipeline
pipeline = Pipeline(
name="<your pipeline name>",
transformation_file="path/to/transformation.py",
space_id="<your space id>",
personal_access_token="<your personal access token>",
source_kind="google_pubsub",
source_config={
"project_id": "<your gcp project id>",
"subscription_id": "<your subscription id>",
"credentials_json": "<your credentials json>"
},
sink_kind="webhook",
sink_config={
"url": "<webhook url>",
"method": "<GET | POST | PUT | PATCH | DELETE>",
"headers": [{"header1": "header1_value"}]
}
).create()
The Pipeline object has a delete method to delete a pipeline
from glassflow import Pipeline
pipeline = Pipeline(
name="<your pipeline name>",
transformation_file="path/to/transformation.py",
space_id="<your space id>",
personal_access_token="<your personal access token>"
).create()
pipeline.delete()
Follow the quickstart guide here
Please note that the GlassFlow Python SDK is currently in beta and is subject to potential breaking changes. We recommend keeping an eye on the official documentation and updating your code accordingly to ensure compatibility with future versions of the SDK.
Anyone who wishes to contribute to this project, whether documentation, features, bug fixes, code cleanup, testing, or code reviews, is very much encouraged to do so.
-
Join the Slack channel.
-
Just raise your hand on the GitHub discussion board.
If you are unfamiliar with how to contribute to GitHub projects, here is a Get Started Guide. A full set of contribution guidelines, along with templates, are in progress.
For any questions, comments, or additional help, please reach out to us via email at help@glassflow.dev. Please check out our Q&A to get solutions for common installation problems and other issues.
To provide feedback or report a bug, please raise an issue on our issue tracker.