-
Notifications
You must be signed in to change notification settings - Fork 4
Data Ingestion
Data ingestion is one of the main components of the pipeline. We have used PySpark, a big data pipeline, to read data using Avro or CSV schema. This has proved to be much more efficient than reading data using a standard reader module. The current module support reading data from the local storage and from the SFTP server. Our next aim is to integrate S3 integration into data ingestion as well.
We have also created a custom data reading function which can be used outside the pipeline. This would facilitate researchers to read data much more quickly with the least amount of effort.
After installing radarpipeline
as a Python library, it can be used to import SparkCSVDataReader
which is a custom data reader module. This would help users to read radar data much more quickly and efficiently. Here's an example of how it would work.
Inputs for initializing the class SparkCSVDataReader
arr config: Dict
, required_data: List[str]
, df_type: str = "pandas"
and spark_config: Dict = {}
.
config: config requires location of the data in the same format as input
in config.yaml
required_data: List of variables that should be read
df_type: Type of the data frame. Same as stated in configuration
spark_config: Dict containing all the spark configuration. Same variables as stated in the spark_config section in Configuration
from radarpipeline import radarpipeline
from radarpipeline.io import SparkCSVDataReader
data_reader = SparkCSVDataReader({"config": {"source_path": "../mockdata/mockdata"}}, ["android_phone_battery_level"])
data = data_reader.read_data()
# In this case source path is the mock data location and the required data is android_phone_battery_level
Logs:
2023-07-18 10:32:50,446 [INFO]: Reading data for user: 2a02e53a-951e-4fd0-b47f-195a87096bd0
2023-07-18 10:32:50,449 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:50,452 [INFO]: Schema found
2023-07-18 10:32:55,040 [INFO]: Reading data for user: 07a69f47-1923-4cfc-b89b-0eefad483f43
2023-07-18 10:32:55,041 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:55,045 [INFO]: Schema found
2023-07-18 10:32:58,568 [INFO]: Reading data for user: 5c0e2ec7-6f85-4041-9669-7145075d1754
2023-07-18 10:32:58,570 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:58,571 [INFO]: Schema found
2023-07-18 10:32:59,113 [INFO]: Reading data for user: 072ddb22-82ef-4b81-8460-41ab096b54bb
2023-07-18 10:32:59,114 [INFO]: Reading data for variable: android_phone_battery_level
2023-07-18 10:32:59,116 [INFO]: Schema found
# Would return all the data keys i.e user ids
print(data.get_data_keys())
Output:
['2a02e53a-951e-4fd0-b47f-195a87096bd0',
'07a69f47-1923-4cfc-b89b-0eefad483f43',
'5c0e2ec7-6f85-4041-9669-7145075d1754',
'072ddb22-82ef-4b81-8460-41ab096b54bb']
# Returns all the combined data for the given variables
print(data.get_data())
Output:
{'2a02e53a-951e-4fd0-b47f-195a87096bd0': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e706a6a0>,
'07a69f47-1923-4cfc-b89b-0eefad483f43': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e71c5760>,
'5c0e2ec7-6f85-4041-9669-7145075d1754': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e7200d90>,
'072ddb22-82ef-4b81-8460-41ab096b54bb': <radarpipeline.datalib.radar_user_data.RadarUserData at 0x7f89e71cdaf0>}
# Returns all the combined data for given variables
data.get_variable_data(variables='android_phone_battery_level')
Output:
key.projectId key.userId key.sourceId value.time value.timeReceived value.batteryLevel value.isPlugged value.status
0 STAGING_PROJECT 2a02e53a-951e-4fd0-b47f-195a87096bd0 f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6 2018-11-24 09:00:52.672000000 2018-11-24 09:00:52.672000000 0.97 False DISCHARGING
1 STAGING_PROJECT 2a02e53a-951e-4fd0-b47f-195a87096bd0 f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6 2018-11-24 09:05:52.667000064 2018-11-24 09:05:52.667000064 0.96 False DISCHARGING
2 STAGING_PROJECT 2a02e53a-951e-4fd0-b47f-195a87096bd0 f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6 2018-11-24 09:10:52.664999936 2018-11-24 09:10:52.664999936 0.95 False DISCHARGING
3 STAGING_PROJECT 2a02e53a-951e-4fd0-b47f-195a87096bd0 f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6 2018-11-24 09:20:52.676000000 2018-11-24 09:20:52.676000000 0.93 False DISCHARGING
4 STAGING_PROJECT 2a02e53a-951e-4fd0-b47f-195a87096bd0 f8ba147e-0c18-4169-a66a-9f6f8fb6e7c6 2018-11-24 09:30:52.776999936 2018-11-24 09:30:52.776999936 0.92 False DISCHARGING
... ... ... ... ... ... ... ... ...
4560 STAGING_PROJECT 072ddb22-82ef-4b81-8460-41ab096b54bb fc184ba0-677f-4aae-a4ea-b2b8c7530b18 2019-10-25 15:45:26.290999808 2019-10-25 15:45:26.290999808 0.64 False DISCHARGING
4561 STAGING_PROJECT 072ddb22-82ef-4b81-8460-41ab096b54bb fc184ba0-677f-4aae-a4ea-b2b8c7530b18 2019-10-25 15:55:25.657000192 2019-10-25 15:55:25.657000192 0.63 False DISCHARGING
4562 STAGING_PROJECT 072ddb22-82ef-4b81-8460-41ab096b54bb fbbed041-f570-4c6c-8a7f-a866ac11fec0 2019-10-27 10:54:34.744999936 2019-10-27 10:54:34.744999936 0.98 False DISCHARGING
4563 STAGING_PROJECT 072ddb22-82ef-4b81-8460-41ab096b54bb fbbed041-f570-4c6c-8a7f-a866ac11fec0 2019-10-27 10:57:26.454999808 2019-10-27 10:57:26.454999808 0.97 False DISCHARGING
4564 STAGING_PROJECT 072ddb22-82ef-4b81-8460-41ab096b54bb fc184ba0-677f-4aae-a4ea-b2b8c7530b18 2019-10-25 23:00:25.706000128 2019-10-25 23:00:25.706000128 0.01 False DISCHARGING