Skip to content

Commit

Permalink
Merge pull request #1 from fnery/add-project
Browse files Browse the repository at this point in the history
Add project
  • Loading branch information
fnery authored May 15, 2024
2 parents c21f602 + d841d01 commit 5c15d58
Show file tree
Hide file tree
Showing 17 changed files with 360 additions and 0 deletions.
72 changes: 72 additions & 0 deletions .github/workflows/run.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Run Fitbit steps pipeline

on:
schedule:
- cron: '0 */2 * * *'
workflow_dispatch: null

env:
GCLOUD_SERVICE_ACCOUNT_KEY_FILE: ${{ secrets.GCLOUD_SERVICE_ACCOUNT_KEY_FILE }}
DESTINATION__BIGQUERY__LOCATION: ${{ secrets.DESTINATION__BIGQUERY__LOCATION }}
FITBIT_ACCESS_TOKEN: ${{ secrets.FITBIT_ACCESS_TOKEN }}

jobs:
maybe_skip:
runs-on: ubuntu-latest
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip }}
steps:
- id: skip_check
uses: fkirc/skip-duplicate-actions@v5
with:
concurrent_skipping: always
skip_after_successful_duplicate: 'false'
do_not_skip: '[]'

run_pipeline:
needs: maybe_skip
if: needs.maybe_skip.outputs.should_skip != 'true'
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v3

- name: Create gcloud service account file and set its path to an environment variable
run: |
GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH=$(pwd)/gcloud_service_account_key_file.json
echo "$GCLOUD_SERVICE_ACCOUNT_KEY_FILE" > $GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH
echo "GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH=$GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH" >> $GITHUB_ENV
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: 3.10.x

- name: Restore virtual environment from cache
uses: syphar/restore-virtualenv@v1
id: cache-virtualenv
with:
requirement_files: requirements.txt

- name: Restore pip download cache
uses: syphar/restore-pip-download-cache@v1
if: steps.cache-virtualenv.outputs.cache-hit != 'true'

- name: Install python dependencies
run: pip install -r requirements.txt
if: steps.cache-virtualenv.outputs.cache-hit != 'true'

- name: Extract data from Fitbit API and load to BigQuery
run: |
export GOOGLE_APPLICATION_CREDENTIALS=${{ env.GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH }}
python 'ingest.py'
working-directory: ingest

- name: Compile dbt project
run: dbt compile --profiles-dir . --target prod
working-directory: transform

- name: Run dbt project
run: dbt run --profiles-dir . --target prod
working-directory: transform

69 changes: 69 additions & 0 deletions ingest/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
from datetime import date, datetime, timedelta
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import dlt
from dlt.sources.helpers import requests

DATASET_NAME = "fitbit_ingest"
TABLE_NAME = "steps"
START_DATE = "2024-01-02"
FITBIT_API_URL = "https://api.fitbit.com/1/user/-/activities/steps/date"

def get_last_loaded_date(client):
"""Retrieve the last date for which data was loaded."""
query = f"""
SELECT MAX(date_time) AS max_loaded_date FROM `{client.project}.{DATASET_NAME}.{TABLE_NAME}`
"""
try:
result = client.query(query).result()
return datetime.strptime(list(result)[0].max_loaded_date, '%Y-%m-%d').date()
except NotFound:
return datetime.strptime(START_DATE, '%Y-%m-%d').date() + timedelta(days=1)

def fetch_fitbit_data(start_date, end_date):
"""Fetch data from Fitbit API."""
url = f"{FITBIT_API_URL}/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}.json"
headers = {
'accept': 'application/json',
'authorization': f'Bearer {os.getenv("FITBIT_ACCESS_TOKEN")}'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()['activities-steps']

def main():
"""Main function to orchestrate data loading from Fitbit API to BigQuery."""
# Setup local environment variables if not running in GitHub Actions
if not os.getenv('GITHUB_ACTIONS') == 'true':
load_dotenv()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '../GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json'

client = bigquery.Client()

last_loaded_date = get_last_loaded_date(client)
today = date.today()
load_period = today - last_loaded_date

if load_period.days > 1095:
# Maximum date range for `steps` endpoint is 1095 days
# https://dev.fitbit.com/build/reference/web-api/activity-timeseries/get-activity-timeseries-by-date-range/#Resource-Options
raise ValueError(f"The difference in days ({load_period.days}) is greater than 1095 days.")

data = fetch_fitbit_data(last_loaded_date - timedelta(days=1), today)
pipeline = dlt.pipeline(
pipeline_name="steps_pipeline",
destination="bigquery",
dataset_name=DATASET_NAME,
)
load_info = pipeline.run(
data,
table_name=TABLE_NAME,
write_disposition="merge",
primary_key="date_time"
)
print(load_info)

if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dlt==0.4.10
google-cloud-bigquery==3.22.0
python-dotenv==1.0.1
dbt-core==1.8.0
dbt-bigquery==1.8.0
streamlit==1.34.0
5 changes: 5 additions & 0 deletions transform/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

target/
dbt_packages/
logs/
.user.yml
15 changes: 15 additions & 0 deletions transform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Empty file added transform/analyses/.gitkeep
Empty file.
36 changes: 36 additions & 0 deletions transform/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'fitbit_steps'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'fitbit_steps'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
fitbit_steps:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Empty file added transform/macros/.gitkeep
Empty file.
16 changes: 16 additions & 0 deletions transform/models/fct_steps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'day'
)
}}

select
date(date_time) AS day,
safe_cast(value as integer) as steps,
timestamp_trunc(current_timestamp(), second) as transformed_at,
from {{ source('fitbit_ingest', 'steps') }}
{% if is_incremental() %}
where date(date_time) >= (select max(day) from {{ this }})
{% endif %}
5 changes: 5 additions & 0 deletions transform/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

version: 2

models:
- name: fct_steps
7 changes: 7 additions & 0 deletions transform/models/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: 2

sources:
- name: fitbit_ingest
database: dlt-bq-test
tables:
- name: steps
25 changes: 25 additions & 0 deletions transform/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
fitbit_steps:
target: prod
outputs:
prod:
type: bigquery
method: service-account
keyfile: "{{ env_var('GCLOUD_SERVICE_ACCOUNT_KEY_FILE_PATH')}}"
project: dlt-bq-test
dataset: fitbit_transform
threads: 4
timeout_seconds: 300
location: US
priority: interactive
retries:
dev:
type: bigquery
method: service-account
keyfile: ../GCLOUD_SERVICE_ACCOUNT_KEY_FILE.json
project: dlt-bq-test
dataset: fitbit_transform
threads: 4
timeout_seconds: 300
location: US
priority: interactive
retries: 1
Empty file added transform/seeds/.gitkeep
Empty file.
Empty file added transform/snapshots/.gitkeep
Empty file.
Empty file added transform/tests/.gitkeep
Empty file.
Binary file added visualize/diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
104 changes: 104 additions & 0 deletions visualize/streamlit_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import streamlit as st
from google.oauth2 import service_account
from google.cloud import bigquery
import pandas as pd
import streamlit as st
from PIL import Image


# Authentication and BigQuery client setup
credentials = service_account.Credentials.from_service_account_info(
st.secrets["gcp_service_account"]
)
client = bigquery.Client(credentials=credentials)

# Cache data loading function
@st.cache_data(ttl=600)
def load_data():
query = f"""
SELECT day, steps
FROM `{client.project}.fitbit_transform.fct_steps`
ORDER BY day
"""
data = client.query(query).to_dataframe()
data['day'] = pd.to_datetime(data['day'])
return data

# Load data
data = load_data()

# Sort data by day in descending order
data = data.sort_values(by='day', ascending=False)

# Format the 'day' column to show as 'YYYY-MM-DD'
data['day'] = data['day'].dt.strftime('%Y-%m-%d')

# Title of the app
st.title('Steps Data Visualization')

# Create tabs
tab1, tab2, tab3 = st.tabs(["Overview", "Insights", "Pipeline Details"])

# Overview tab
with tab1:
st.header("Overview")
st.write("This app visualizes step count data over time, collected from a Fitbit device.")

st.write("### Key Metrics")
st.write(f"**Total Days of Data:** {len(data)}")
st.write(f"**Average Steps per Day:** {data['steps'].mean():,.0f}")
st.write(f"**Total Steps:** {data['steps'].sum():,}")

col1, col2 = st.columns([2, 1])

with col1:
st.write("### Line Chart")
st.line_chart(data.set_index('day')['steps'], width=700, height=400)

with col2:
st.write("### Data Table")
st.dataframe(data)

# Insights tab
with tab2:
st.header("Insights")
st.write("In this section, we provide insights derived from the step data.")

# Calculate moving average
data['7_day_avg'] = data['steps'].rolling(window=7).mean()

st.write("### 7-Day Moving Average of Steps")
st.line_chart(data.set_index('day')[['steps', '7_day_avg']])

# Highlight trends
max_steps_day = data[data['steps'] == data['steps'].max()]['day'].iloc[0]
min_steps_day = data[data['steps'] == data['steps'].min()]['day'].iloc[0]
st.write(f"**Day with Maximum Steps:** {max_steps_day} ({data['steps'].max():,} steps)")
st.write(f"**Day with Minimum Steps:** {min_steps_day} ({data['steps'].min():,} steps)")

st.write("### Weekly Trends")
data['week'] = pd.to_datetime(data['day']).dt.isocalendar().week
weekly_data = data.groupby('week').agg({'steps': 'sum'})
st.bar_chart(weekly_data)

# Pipeline Details tab
with tab3:
st.header("Pipeline Details")
st.write("This section provides details about the data pipeline used to extract, transform, and visualize the step data.")

image = Image.open('visualize/diagram.png')
st.image(image, caption="Data Pipeline")

st.write("""
### Pipeline Description
The data pipeline consists of the following components:
- **[Fitbit API](https://dev.fitbit.com/build/reference/web-api/)**: Used to extract step count data.
- **[data load tool (dlt)](https://dlthub.com/)**: Used to load data into BigQuery.
- **[Google BigQuery](https://cloud.google.com/bigquery)**: A fully-managed data warehouse used to store and query the step data.
- **[dbt](https://www.getdbt.com/)**: A data transformation tool used to transform and model the data in BigQuery.
- **[GitHub Actions](https://github.com/features/actions)**: Used to automate the deployment of the data pipeline.
- **[Streamlit](https://streamlit.io/)**: Used to visualize the data in an interactive web app.
The process starts with extracting data from the Fitbit API, loading it into BigQuery using dlt. The data is then transformed using dbt and visualized using Streamlit. GitHub Actions are used to automate the deployment of the pipeline.
""")

0 comments on commit 5c15d58

Please sign in to comment.