Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for pgvector [NAIVE] #251

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
pipx install poetry
- name: Install dependencies
run: |
poetry install
poetry install --all-extras
- name: Run pytest
run: |
poetry run pytest --capture=no
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ smoke-test
.meltano/**
.tox/**
.secrets/**
.idea
.vscode/**
output/**
.env
Expand Down
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ tap-carbon-intensity | target-postgres --config /path/to/target-postgres-config.

```bash
pipx install poetry
poetry install
poetry install --all-extras
pipx install pre-commit
pre-commit install
```
Expand Down Expand Up @@ -152,6 +152,8 @@ develop your own Singer taps and targets.

## Data Types

### Mapping

The below table shows how this tap will map between jsonschema datatypes and Postgres datatypes.

| jsonschema | Postgres |
Expand Down Expand Up @@ -202,7 +204,20 @@ The below table shows how this tap will map between jsonschema datatypes and Pos

Note that while object types are mapped directly to jsonb, array types are mapped to a jsonb array.

If a column has multiple jsonschema types, the following order is using to order Postgres types, from highest priority to lowest priority.
When using [pgvector], this type mapping applies, additionally to the table above.

| jsonschema | Postgres |
|------------------------------------------------|----------|
| array (with additional SCHEMA annotations [1]) | vector |

[1] `"additionalProperties": {"storage": {"type": "vector", "dim": 4}}`

### Resolution Order

If a column has multiple jsonschema types, there is a priority list for
resolving the best type candidate, from the highest priority to the
lowest priority.

- ARRAY(JSONB)
- JSONB
- TEXT
Expand All @@ -215,3 +230,9 @@ If a column has multiple jsonschema types, the following order is using to order
- INTEGER
- BOOLEAN
- NOTYPE

When using [pgvector], the `pgvector.sqlalchemy.Vector` type is added to the bottom
of the list.


[pgvector]: https://github.com/pgvector/pgvector
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
version: "2.1"
services:
postgres:
image: docker.io/postgres:latest
image: ankane/pgvector:latest
amotl marked this conversation as resolved.
Show resolved Hide resolved
command: postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key -c ssl_ca_file=/var/lib/postgresql/ca.crt -c hba_file=/var/lib/postgresql/pg_hba.conf
environment:
POSTGRES_USER: postgres
Expand All @@ -13,16 +13,19 @@ services:
POSTGRES_INITDB_ARGS: --auth-host=cert
# Not placed in the data directory (/var/lib/postgresql/data) because of https://gist.github.com/mrw34/c97bb03ea1054afb551886ffc8b63c3b?permalink_comment_id=2678568#gistcomment-2678568
volumes:
- ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql
- ./ssl/server.crt:/var/lib/postgresql/server.crt # Certificate verifying the server's identity to the client.
- ./ssl/server.key:/var/lib/postgresql/server.key # Private key to verify the server's certificate is legitimate.
- ./ssl/ca.crt:/var/lib/postgresql/ca.crt # Certificate authority to use when verifying the client's identity to the server.
- ./ssl/pg_hba.conf:/var/lib/postgresql/pg_hba.conf # Configuration file to allow connection over SSL.
ports:
- "5432:5432"
postgres_no_ssl: # Borrowed from https://github.com/MeltanoLabs/tap-postgres/blob/main/.github/workflows/test.yml#L13-L23
image: docker.io/postgres:latest
image: ankane/pgvector:latest
environment:
POSTGRES_PASSWORD: postgres
volumes:
- ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- 5433:5432
ssh:
Expand All @@ -37,17 +40,20 @@ services:
- PASSWORD_ACCESS=false
- USER_NAME=melty
volumes:
- ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql
- ./ssh_tunnel/ssh-server-config:/config/ssh_host_keys:ro
ports:
- "127.0.0.1:2223:2222"
networks:
- inner
postgresdb:
image: postgres:13.0
image: ankane/pgvector:latest
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: main
volumes:
- ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
inner:
ipv4_address: 10.5.0.5
Expand Down
64 changes: 57 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ packages = [
python = "<3.13,>=3.8.1"
requests = "^2.25.1"
singer-sdk = ">=0.28,<0.34"
pgvector = { version="^0.2.4", optional = true }
psycopg2-binary = "2.9.9"
sqlalchemy = ">=2.0,<3.0"
sshtunnel = "0.4.0"
Expand All @@ -51,11 +52,17 @@ types-simplejson = "^3.19.0.2"
types-sqlalchemy = "^1.4.53.38"
types-jsonschema = "^4.19.0.3"

[tool.poetry.extras]
pgvector = ["pgvector"]

[tool.mypy]
exclude = "tests"

[[tool.mypy.overrides]]
module = ["sshtunnel"]
module = [
"pgvector.sqlalchemy",
"sshtunnel",
]
ignore_missing_imports = true

[tool.isort]
Expand Down
74 changes: 68 additions & 6 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ def prepare_table( # type: ignore[override]
connection=connection,
)
return table
# To make table reflection work properly with pgvector,
# the module needs to be imported beforehand.
try:
from pgvector.sqlalchemy import Vector # noqa: F401
except ImportError:
self.logger.debug(
"Unable to handle pgvector's `Vector` type. Please install `pgvector`."
)
meta.reflect(connection, only=[table_name])
table = meta.tables[
full_table_name
Expand Down Expand Up @@ -180,8 +188,10 @@ def copy_table_structure(

@contextmanager
def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]:
with self._engine.connect().execution_options() as conn:
engine = self._engine
with engine.connect().execution_options() as conn:
yield conn
engine.dispose()

def drop_table(
self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection
Expand Down Expand Up @@ -278,6 +288,51 @@ def pick_individual_type(jsonschema_type: dict):
if "object" in jsonschema_type["type"]:
return JSONB()
if "array" in jsonschema_type["type"]:
# Select between different kinds of `ARRAY` data types.
#
# This currently leverages an unspecified definition for the Singer SCHEMA,
# using the `additionalProperties` attribute to convey additional type
# information, agnostic of the target database.
#
# In this case, it is about telling different kinds of `ARRAY` types apart:
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
# alternatively, it can be a "vector" kind `ARRAY` of floating point
# numbers, effectively what pgvector is storing in its `VECTOR` type.
#
# Still, `type: "vector"` is only a surrogate label here, because other
# database systems may use different types for implementing the same thing,
# and need to translate accordingly.
"""
Schema override rule in `meltano.yml`:

type: "array"
items:
type: "number"
additionalProperties:
storage:
type: "vector"
dim: 4

Produced schema annotation in `catalog.json`:

{"type": "array",
"items": {"type": "number"},
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
"""
if (
"additionalProperties" in jsonschema_type
and "storage" in jsonschema_type["additionalProperties"]
):
storage_properties = jsonschema_type["additionalProperties"]["storage"]
if (
"type" in storage_properties
and storage_properties["type"] == "vector"
):
# On PostgreSQL/pgvector, use the corresponding type definition
# from its SQLAlchemy dialect.
from pgvector.sqlalchemy import Vector

return Vector(storage_properties["dim"])
return ARRAY(JSONB())
if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
Expand Down Expand Up @@ -311,6 +366,13 @@ def pick_best_sql_type(sql_type_array: list):
NOTYPE,
]

try:
from pgvector.sqlalchemy import Vector

precedence_order.append(Vector)
except ImportError:
pass

for sql_type in precedence_order:
for obj in sql_type_array:
if isinstance(obj, sql_type):
Expand All @@ -330,7 +392,7 @@ def create_empty_table( # type: ignore[override]
"""Create an empty target table.

Args:
full_table_name: the target table name.
table_name: the target table name.
schema: the JSON schema for the new table.
primary_keys: list of key properties.
partition_keys: list of partition keys.
Expand Down Expand Up @@ -425,7 +487,7 @@ def _create_empty_column( # type: ignore[override]
"""Create a new column.

Args:
full_table_name: The target table name.
table_name: The target table name.
column_name: The name of the new column.
sql_type: SQLAlchemy type engine to be used in creating the new column.

Expand Down Expand Up @@ -489,7 +551,7 @@ def _adapt_column_type( # type: ignore[override]
"""Adapt table column type to support the new JSON schema type.

Args:
full_table_name: The target table name.
table_name: The target table name.
column_name: The target column name.
sql_type: The new SQLAlchemy type.

Expand Down Expand Up @@ -517,7 +579,7 @@ def _adapt_column_type( # type: ignore[override]
return

# Not the same type, generic type or compatible types
# calling merge_sql_types for assistnace
# calling merge_sql_types for assistance.
compatible_sql_type = self.merge_sql_types([current_type, sql_type])

if str(compatible_sql_type) == str(current_type):
Expand Down Expand Up @@ -720,7 +782,7 @@ def _get_column_type( # type: ignore[override]
"""Get the SQL type of the declared column.

Args:
full_table_name: The name of the table.
table_name: The name of the table.
column_name: The name of the column.

Returns:
Expand Down
5 changes: 5 additions & 0 deletions target_postgres/tests/data_files/array_boolean.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}}
{"type": "STATE", "value": {"array_boolean": 3}}
Loading
Loading