Skip to content

Commit

Permalink
Merge pull request #3 from gabfl/casting
Browse files Browse the repository at this point in the history
Implement casting
  • Loading branch information
gabfl authored Oct 5, 2017
2 parents e89556d + f979d13 commit 8e7446e
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 43 deletions.
40 changes: 8 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ It allows to write queries in PostgreSQL SQL syntax using a foreign table. It su

## Features and limitations

- Table partitioning is supported. You can use partitions in your SQL queries.
- Table partitioning is supported. [You can use partitions in your SQL queries](docs/table_partitioning.md).
- Queries are parameterized when sent to BigQuery
- BigQuery's standard SQL support (legacy SQL is not supported)
- Authentication works with a "Service Account" Json private key
- Authentication works with a "[Service Account](docs/service_account.md)" Json private key

[Read more](docs/README.md).

## Requirements

Expand All @@ -33,7 +35,7 @@ It allows to write queries in PostgreSQL SQL syntax using a foreign table. It su

## Installation

```
```bash
# Install `setuptools` if necessary
pip3 install --upgrade setuptools

Expand Down Expand Up @@ -81,37 +83,11 @@ List of options implemented in `CREATE FOREIGN TABLE` syntax:
| `fdw_table` | - | BigQuery table name |
| `fdw_key` | - | Path to private Json key (See [Key storage recommendations](docs/key_storage.md)) |
| `fdw_convert_tz` | - | Convert BigQuery time zone for dates and timestamps to selected time zone. Example: `'US/Eastern'`. |
| `fdw_group` | `'false'` | See [Remote grouping and counting](remote_grouping.md). |
| `fdw_group` | `'false'` | See [Remote grouping and counting](docs/remote_grouping.md). |
| `fdw_casting` | - | See [Casting](docs/casting.md). |
| `fdw_verbose` | `'false'` | Set to `'true'` to output debug information in PostrgeSQL's logs |
| `fdw_sql_dialect` | `'standard'` | BigQuery SQL dialect. Currently only `standard` is supported. |

## Table partitioning

BigQuery **table partitioning is supported**. When partitioning a table, BigQuery creates a pseudo column called `_PARTITIONTIME`.

To use partitions, you need to add a column `partition_date` with the type `date` in the foreign table definition, for example:

```sql
CREATE FOREIGN TABLE my_bigquery_table (
column1 text,
column2 bigint,
partition_date date -- <-- partition!
) SERVER bigquery_srv
OPTIONS (
fdw_dataset 'my_dataset',
fdw_table 'my_table',
fdw_key '/opt/bigquery_fdw/user.json'
);
```

You can then use the partition in the `WHERE` clause:

```sql
SELECT column1, column2
FROM my_bigquery_table
WHERE column1 = 'abc' AND partition_date = '2017-12-01'
```

## More documentation

See [/docs](/docs).
See [bigquery_fdw documentation](docs/README.md).
2 changes: 2 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

## Advanced usage

- [Table partitioning](table_partitioning.md)
- [Time zone conversion support](timezone.md)
- [Remote grouping and counting](remote_grouping.md)
- [Casting (convert column type to another type)](casting.md)

## BigQuery client

Expand Down
77 changes: 77 additions & 0 deletions docs/casting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Casting (convert column type to another type)

bigquery_fdw has an option to implement BigQuery's casting feature:

> Cast syntax is used in a query to indicate that the result type of an expression should be converted to some other type.
## Foreign table creation syntax

```sql
CREATE FOREIGN TABLE table_name (
[...]
) SERVER bigquery_srv
OPTIONS (
[...]
fdw_casting '{"column1": "CAST_TO_TYPE", "column2": "CAST_TO_TYPE", ...}'
);
```

## Usage example

### Without casting option

```sql
test=# DROP FOREIGN TABLE tmp;

test=# CREATE FOREIGN TABLE tmp (
column1 bigint,
timestamp timestamp
) SERVER bigquery_srv
OPTIONS (
fdw_dataset 'my_dataset',
fdw_table 'my_table',
fdw_key '/opt/bigquery_fdw/user.json'
);

test=# SELECT timestamp FROM tmp LIMIT 5;
timestamp
---------------------
2017-09-14 09:12:32
2017-09-14 09:12:32
2017-09-14 09:12:32
2017-09-12 00:36:16
2017-09-12 17:23:12
(5 rows)
```

### With casting option

```sql
test=# DROP FOREIGN TABLE tmp;

test=# CREATE FOREIGN TABLE tmp (
column1 bigint,
timestamp timestamp
) SERVER bigquery_srv
OPTIONS (
fdw_dataset 'my_dataset',
fdw_table 'my_table',
fdw_key '/opt/bigquery_fdw/user.json',
fdw_casting '{"timestamp": "DATE"}'
);

test=# SELECT timestamp FROM tmp LIMIT 5;
timestamp
---------------------
2017-09-11 00:00:00
2017-09-11 00:00:00
2017-09-11 00:00:00
2017-09-11 00:00:00
2017-09-11 00:00:00
(5 rows)
```


## External links

- [BigQuery casting](https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#casting)
2 changes: 1 addition & 1 deletion docs/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@

## External links

- [BigQuery functions and operators](https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators)
- [BigQuery operators](https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#operators)
26 changes: 26 additions & 0 deletions docs/table_partitioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Table partitioning

BigQuery **table partitioning is supported**. When partitioning a table, BigQuery creates a pseudo column called `_PARTITIONTIME`.

To use partitions, you need to add a column `partition_date` with the type `date` in the foreign table definition, for example:

```sql
CREATE FOREIGN TABLE my_bigquery_table (
column1 text,
column2 bigint,
partition_date date -- <-- partition!
) SERVER bigquery_srv
OPTIONS (
fdw_dataset 'my_dataset',
fdw_table 'my_table',
fdw_key '/opt/bigquery_fdw/user.json'
);
```

You can then use the partition in the `WHERE` clause:

```sql
SELECT column1, column2
FROM my_bigquery_table
WHERE column1 = 'abc' AND partition_date = '2017-12-01'
```
103 changes: 93 additions & 10 deletions src/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class ConstantForeignDataWrapper(ForeignDataWrapper):
bq = None # BqClient instance
partitionPseudoColumn = 'partition_date' # Name of the partition pseudo column
partitionPseudoColumnValue = None # If a partition is used, its value will be stored in this variable to return it to PostgreSQL
countPseudoColumn = '_fdw_count'
countPseudoColumn = '_fdw_count' # Pseudo column to fetch `count(*)` when using the remote counting and grouping feature
castingRules = None # Dict of casting rules when using the `fdw_casting` option

def __init__(self, options, columns):
"""
Expand All @@ -29,8 +30,9 @@ def __init__(self, options, columns):
# Set table columns
self.columns = columns

# Set data types mapping
# Set data types and conversion rules mapping
self.setDatatypes()
self.setConversionRules()

def setOptions(self, options):
"""
Expand All @@ -50,6 +52,9 @@ def setOptions(self, options):

# Set grouping option
self.setOptionGroupBy(options.get('fdw_group'))

# Set casting rules
self.setOptionCasting(options.get('fdw_casting'))
except KeyError:
log_to_postgres("You must specify these options when creating the FDW: fdw_key, fdw_dataset, fdw_table", ERROR)

Expand All @@ -73,6 +78,28 @@ def setDatatypes(self):
datatype('timestamp without time zone', 'DATETIME', 'DATETIME'),
]

def setConversionRules(self):
"""
Set list of allowed conversion rules
"""

# Create a named tuple
conversionRule = namedtuple('conversionRule', 'bq_standard_from bq_standard_to')

self.conversionRules = [
conversionRule('INT64', ['BOOL', 'FLOAT64', 'INT64', 'STRING']),
conversionRule('FLOAT64', ['FLOAT64', 'INT64', 'STRING']),
conversionRule('BOOL', ['BOOL', 'INT64', 'STRING']),
conversionRule('STRING', ['BOOL', 'BYTES', 'DATE', 'DATETIME', 'FLOAT64', 'INT64', 'STRING', 'TIME', 'TIMESTAMP']),
conversionRule('BYTES', ['BYTES', 'STRING']),
conversionRule('DATE', ['DATE', 'DATETIME', 'STRING', 'TIMESTAMP']),
conversionRule('DATETIME', ['DATE', 'DATETIME', 'STRING', 'TIME', 'TIMESTAMP']),
conversionRule('TIME', ['STRING', 'TIME']),
conversionRule('TIMESTAMP', ['DATE', 'DATETIME', 'STRING', 'TIME', 'TIMESTAMP']),
conversionRule('ARRAY', ['ARRAY']),
conversionRule('STRUCT', ['STRUCT']),
]

def setOptionSqlDialect(self, standard_sql):
"""
Set a flag for the SQL dialect.
Expand Down Expand Up @@ -100,6 +127,28 @@ def setOptionGroupBy(self, group):

self.groupBy = False

def setOptionCasting(self, cactingRules):
"""
Conversion rules are received as a string, for example: '{"key": "FLOAT64", "datetime": "DATE"}'
The string will be converted to a dict
"""

if cactingRules:
# Cast string as a dict
try:
import ast
self.castingRules = ast.literal_eval(cactingRules)
except Exception as e:
log_to_postgres("fdw_casting conversion failed: `" + str(e) + "`", ERROR)

# For security reasons, ensure that the string was correctly casted as a dict
try:
if type(self.castingRules) is not dict:
raise ValueError('fdw_casting format is incorrect.')
except Exception as e:
log_to_postgres("fdw_casting conversion failed: `" + str(e) + "`", ERROR)

def getClient(self):
"""
Manage a pool of instances of BqClient
Expand Down Expand Up @@ -230,11 +279,17 @@ def buildColumnList(self, columns, usage='SELECT'):
# Get column data type
dataType = self.getBigQueryDatatype(column)

# Save column original name
columnOriginalName = column

# If the data type is a date or a timestamp
if dataType in ['DATE', 'TIMESTAMP']:
column = self.setTimeZone(column, dataType, useAliases)
column = self.setTimeZone(column, dataType)

# Data type casting
column = self.castColumn(column, columnOriginalName, dataType)

clause += column + ", "
clause += column + " " + self.addColumnAlias(columnOriginalName, useAliases) + ", "

# Remove final `, `
clause = clause.strip(', ')
Expand All @@ -243,17 +298,41 @@ def buildColumnList(self, columns, usage='SELECT'):

return clause

def setTimeZone(self, column, dataType, useAliases=True):
def setTimeZone(self, column, dataType):
"""
If the option `fdw_convert_tz` is used, convert the time zone automatically from UTC to the desired time zone
"""

# Option is set
if self.convertToTz:
if dataType == 'DATE': # BigQuery column type is `DATE`
return 'DATE(' + column + ', "' + self.convertToTz + '") ' + self.addColumnAlias(column, useAliases)
return 'DATE(' + column + ', "' + self.convertToTz + '") '
else: # BigQuery column type is `TIMESTAMP`
return 'DATETIME(' + column + ', "' + self.convertToTz + '") ' + self.addColumnAlias(column, useAliases)
return 'DATETIME(' + column + ', "' + self.convertToTz + '") '

# Option is not set
return column

def castColumn(self, column, columnOriginalName, dataType):
"""
If the option `fdw_casting` is used, this method will attempt to cast the column to the new type
"""

if self.castingRules and columnOriginalName in self.castingRules: # If we have casting rule for this column
# Get desired casting
castTo = self.castingRules[columnOriginalName]

# Find if we have a matching rule
rule = [conversionRule for conversionRule in self.conversionRules if conversionRule.bq_standard_from == dataType.upper()]

if rule:
# Check if casting from the original data type to the new one is supported
if castTo.upper() in rule[0].bq_standard_to:
return 'CAST(' + column + ' as ' + castTo.upper() + ')'
else:
log_to_postgres("Casting from the data type `" + dataType.upper() + "` to the data type `" + castTo.upper() + "` is not permitted.", ERROR)
else:
log_to_postgres("Casting from the data type `" + dataType.upper() + "` is not permitted.", ERROR)

# Option is not set
return column
Expand Down Expand Up @@ -282,7 +361,7 @@ def buildWhereClause(self, quals):
clause += "WHERE "
for qual in quals:
if qual.field_name == self.partitionPseudoColumn:
clause += "_PARTITIONTIME = ?"
clause += "_PARTITIONTIME " + str(self.getOperator(qual.operator)) + " ?"
parameters.append(self.setParameter(qual.field_name, 'TIMESTAMP', qual.value)) # Force data type to `TIMESTAMP`

# Store the value to return it to PostgreSQL
Expand Down Expand Up @@ -321,7 +400,7 @@ def getOperator(self, operator):
else: # Operator is not supported
log_to_postgres("Operator `" + operator + "` is not currently supported", ERROR)

def getBigQueryDatatype(self, column):
def getBigQueryDatatype(self, column, dialect='standard'):
"""
Returns the BigQuery standard SQL data type of a PostgreSQL column
Expand All @@ -334,7 +413,11 @@ def getBigQueryDatatype(self, column):

for datatype in self.datatypes: # For each known data types
if datatype.postgres == pgDatatype: # If the PostgreSQL data type matches the known data type
return datatype.bq_standard # Returns equivalent BigQuery data type
# Returns equivalent BigQuery data type
if dialect == 'legacy':
return datatype.bq_legacy
else:
return datatype.bq_standard

# Return a default data type in an attempt to save the day
return 'STRING'
Expand Down

0 comments on commit 8e7446e

Please sign in to comment.