Skip to content

Commit

Permalink
enables local weaviate (#591)
Browse files Browse the repository at this point in the history
* enables local weaviate and various vectorizers

* tests local weaviate
  • Loading branch information
rudolfix authored Aug 27, 2023
1 parent 0d14605 commit 784c1cf
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 69 deletions.
34 changes: 34 additions & 0 deletions .github/weaviate-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3.4'
services:
weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: semitechnologies/weaviate:1.21.1
ports:
- 8080:8080
volumes:
- weaviate_data
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'none'
ENABLE_MODULES: 'text2vec-contextionary,text2vec-cohere,text2vec-huggingface,text2vec-palm,text2vec-openai,generative-openai,generative-cohere,generative-palm,ref2vec-centroid,reranker-cohere,qna-openai'
CONTEXTIONARY_URL: contextionary:9999
CLUSTER_HOSTNAME: 'node1'
contextionary:
environment:
OCCURRENCE_WEIGHT_LINEAR_FACTOR: 0.75
EXTENSIONS_STORAGE_MODE: weaviate
EXTENSIONS_STORAGE_ORIGIN: http://weaviate:8080
NEIGHBOR_OCCURRENCE_IGNORE_PERCENTILE: 5
ENABLE_COMPOUND_SPLITTING: 'false'
image: semitechnologies/contextionary:en0.16.0-v1.2.1
ports:
- 9999:9999
14 changes: 12 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ env:
RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\"]"
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}"

jobs:
get_docs_changes:
uses: ./.github/workflows/get_docs_changes.yml
Expand Down Expand Up @@ -58,6 +61,9 @@ jobs:
- name: Check out
uses: actions/checkout@master

- name: Start weaviate
run: docker-compose -f ".github/weaviate-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
with:
Expand All @@ -78,9 +84,13 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate

- run: poetry run pytest tests/load tests/cli
name: Run tests Linux
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data

- name: Stop weaviate
if: always()
run: docker-compose -f ".github/weaviate-compose.yml" down -v
11 changes: 7 additions & 4 deletions dlt/destinations/weaviate/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

@configspec
class WeaviateCredentials(CredentialsConfiguration):
url: str
api_key: str
url: str = "http://localhost:8080"
api_key: Optional[str]
additional_headers: Optional[Dict[str, str]] = None

def __str__(self) -> str:
Expand All @@ -32,8 +32,11 @@ class WeaviateClientConfiguration(DestinationClientDwhConfiguration):
batch_workers: int = 1
batch_consistency: TWeaviateBatchConsistency = "ONE"
batch_retries: int = 5
conn_timeout: int = 10
read_timeout: int = 3*60

conn_timeout: float = 10.0
read_timeout: float = 3*60.0
startup_period: int = 5

dataset_separator: str = "_"

credentials: WeaviateCredentials
Expand Down
90 changes: 39 additions & 51 deletions dlt/destinations/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@
"blob": "binary",
}

NON_VECTORIZED_CLASS = {
"vectorizer": "none",
"vectorIndexConfig": {
"skip": True,
}
}


def wrap_weaviate_error(f: TFun) -> TFun:
@wraps(f)
Expand Down Expand Up @@ -243,9 +250,12 @@ def sentinel_class(self) -> str:

@staticmethod
def create_db_client(config: WeaviateClientConfiguration) -> weaviate.Client:
auth_client_secret: weaviate.AuthApiKey = weaviate.AuthApiKey(api_key=config.credentials.api_key) if config.credentials.api_key else None
return weaviate.Client(
url=config.credentials.url,
auth_client_secret=weaviate.AuthApiKey(api_key=config.credentials.api_key),
timeout_config=(config.conn_timeout, config.read_timeout),
startup_period=config.startup_period,
auth_client_secret=auth_client_secret,
additional_headers=config.credentials.additional_headers,
)

Expand Down Expand Up @@ -389,7 +399,7 @@ def is_storage_initialized(self) -> bool:

def create_sentinel_class(self) -> None:
"""Create an empty class to indicate that the storage is initialized."""
self.create_class({}, full_class_name=self.sentinel_class)
self.create_class(NON_VECTORIZED_CLASS, full_class_name=self.sentinel_class)

def delete_sentinel_class(self) -> None:
"""Delete the sentinel class."""
Expand Down Expand Up @@ -429,7 +439,7 @@ def _execute_schema_update(self, only_tables: Iterable[str]) -> None:
if len(new_columns) > 0:
if exists:
for column in new_columns:
prop = self._make_property_schema(column["name"], column, True)
prop = self._make_property_schema(column["name"], column)
self.create_class_property(table_name, prop)
else:
class_schema = self.make_weaviate_class_schema(table_name)
Expand Down Expand Up @@ -488,77 +498,55 @@ def get_schema_by_hash(self, schema_hash: str) -> Optional[StorageSchemaInfo]:

def make_weaviate_class_schema(self, table_name: str) -> Dict[str, Any]:
"""Creates a Weaviate class schema from a table schema."""
if table_name.startswith(self.schema._dlt_tables_prefix):
return self._make_non_vectorized_class_schema(table_name)
class_schema: Dict[str, Any] = {
"class": table_name,
"properties": self._make_properties(table_name),
}

return self._make_vectorized_class_schema(table_name)
# check if any column requires vectorization
if get_columns_names_with_prop(self.schema.get_table(table_name), VECTORIZE_HINT): # type: ignore
class_schema.update(self._vectorizer_config)
else:
class_schema.update(NON_VECTORIZED_CLASS)

def _make_properties(
self, table_name: str, is_vectorized_class: bool = True
) -> List[Dict[str, Any]]:
return class_schema

def _make_properties(self, table_name: str) -> List[Dict[str, Any]]:
"""Creates a Weaviate properties schema from a table schema.
Args:
table: The table name for which columns should be converted to properties
is_vectorized_class: Controls whether the `moduleConfig` should be
added to the properties schema. This is only needed for
vectorized classes.
"""

return [
self._make_property_schema(column_name, column, is_vectorized_class)
self._make_property_schema(column_name, column)
for column_name, column in self.schema.get_table_columns(table_name).items()
]

def _make_property_schema(
self, column_name: str, column: TColumnSchema, is_vectorized_class: bool
) -> Dict[str, Any]:
def _make_property_schema(self, column_name: str, column: TColumnSchema) -> Dict[str, Any]:
extra_kv = {}

if is_vectorized_class:
vectorizer_name = self._vectorizer_config["vectorizer"]

# x-weaviate-vectorize: (bool) means that this field should be vectorized
if not column.get(VECTORIZE_HINT, False):
# do not vectorize
extra_kv["moduleConfig"] = {
vectorizer_name: {
"skip": True,
}
vectorizer_name = self._vectorizer_config["vectorizer"]
# x-weaviate-vectorize: (bool) means that this field should be vectorized
if not column.get(VECTORIZE_HINT, False):
# tell weaviate explicitly to not vectorize when column has no vectorize hint
extra_kv["moduleConfig"] = {
vectorizer_name: {
"skip": True,
}
}

# x-weaviate-tokenization: (str) specifies the method to use
# for tokenization
if TOKENIZATION_HINT in column:
extra_kv["tokenization"] = column[TOKENIZATION_HINT] # type: ignore
# x-weaviate-tokenization: (str) specifies the method to use
# for tokenization
if TOKENIZATION_HINT in column:
extra_kv["tokenization"] = column[TOKENIZATION_HINT] # type: ignore

return {
"name": column_name,
"dataType": [self._to_db_type(column["data_type"])],
**extra_kv,
}

def _make_vectorized_class_schema(self, table_name: str) -> Dict[str, Any]:
properties = self._make_properties(table_name)

return {
"class": table_name,
"properties": properties,
**self._vectorizer_config,
}

def _make_non_vectorized_class_schema(self, table_name: str) -> Dict[str, Any]:
properties = self._make_properties(table_name, is_vectorized_class=False)

return {
"class": table_name,
"properties": properties,
"vectorizer": "none",
"vectorIndexConfig": {
"skip": True,
},
}

def start_file_load(
self, table: TTableSchema, file_path: str, load_id: str
) -> LoadJob:
Expand Down
29 changes: 24 additions & 5 deletions docs/website/docs/dlt-ecosystem/destinations/weaviate.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ api_key = "your-weaviate-api-key"
X-OpenAI-Api-Key = "your-openai-api-key"
```

In this setup guide, we are using the [Weaviate Cloud Services](https://console.weaviate.cloud/) to get a Weaviate instance and [OpenAI API](https://platform.openai.com/) for generating embeddings through the [text2vec-openai](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules/text2vec-openai) module. You can host your own weaviate instance using docker compose, kubernetes or embedded. Refer to Weaviate's [How-to: Install](https://weaviate.io/developers/weaviate/installation) for details.
In this setup guide, we are using the [Weaviate Cloud Services](https://console.weaviate.cloud/) to get a Weaviate instance and [OpenAI API](https://platform.openai.com/) for generating embeddings through the [text2vec-openai](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules/text2vec-openai) module.

You can host your own weaviate instance using docker compose, kubernetes or embedded. Refer to Weaviate's [How-to: Install](https://weaviate.io/developers/weaviate/installation) for details. In that case you can skip the credentials part altogether:

```toml
[destination.weaviate.credentials.additional_headers]
X-OpenAI-Api-Key = "your-openai-api-key"
```
The `url` will default to **http://localhost:8080** and `api_key` is not defined - which are the defaults for Weaviate container.


3. Define the source of the data. For starters, let's load some data from a simple data structure:

Expand Down Expand Up @@ -223,14 +232,24 @@ Reserved property names like `id` or `additional` are prefixed with underscores

- `batch_size`: (int) the number of items in the batch insert request. The default is 100.
- `batch_workers`: (int) the maximal number of concurrent threads to run batch import. The default is 1.
- batch_consistency: (str) the number of replica nodes in the cluster that must acknowledge a write or read request before it's considered successful. The available consistency levels include:
- `batch_consistency`: (str) the number of replica nodes in the cluster that must acknowledge a write or read request before it's considered successful. The available consistency levels include:
- `ONE`: Only one replica node needs to acknowledge.
- `QUORUM`: Majority of replica nodes (calculated as `replication_factor / 2 + 1`) must acknowledge.
- `ALL`: All replica nodes in the cluster must send a successful response.
The default is `ONE`.
- batch_retries: (int) number of retries to create a batch that failed with ReadTimeout. The default is 5.
- dataset_separator: (str) the separator to use when generating the class names in Weaviate.
- vectorizer: (str) the name of [the vectorizer](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules) to use. The default is `text2vec-openai`.
- `batch_retries`: (int) number of retries to create a batch that failed with ReadTimeout. The default is 5.
- `dataset_separator`: (str) the separator to use when generating the class names in Weaviate.
- `conn_timeout` and `read_timeout`: (float) to set timeouts (in seconds) when connecting and reading from REST API. defaults to (10.0, 180.0)
- `startup_period` (int) - how long to wait for weaviate to start
- `vectorizer`: (str) the name of [the vectorizer](https://weaviate.io/developers/weaviate/modules/retriever-vectorizer-modules) to use. The default is `text2vec-openai`.
- `moduleConfig`: (dict) configurations of various Weaviate modules

Below is an example that configures the **contextionary** vectorizer. You can put this into `config.toml` - no secrets are passed.
```toml
[destination.weaviate]
vectorizer="text2vec-contextionary"
module_config={text2vec-contextionary = { vectorizeClassName = false, vectorizePropertyName = true}}
```

### dbt support

Expand Down
17 changes: 12 additions & 5 deletions tests/load/weaviate/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,28 +283,35 @@ def test_merge_github_nested() -> None:
assert_class(p, "Issues", expected_items_count=17)


@pytest.mark.skip(reason="skip to avoid race condition with other tests")
def test_empty_dataset_allowed() -> None:
# weaviate dataset_name is optional so dataset name won't be autogenerated when not explicitly passed
p = dlt.pipeline(destination="weaviate", full_refresh=True)
# check if we use localhost
client: WeaviateClient = p._destination_client()
if "localhost" not in client.config.credentials.url:
pytest.skip("skip to avoid race condition with other tests")

assert p.dataset_name is None
info = p.run(weaviate_adapter(["a", "b", "c"], vectorize=["value"]))
info = p.run(weaviate_adapter(["context", "created", "not a stop word"], vectorize=["value"]))
# dataset in load info is empty
assert info.dataset_name is None
# check weaviate client props
client: WeaviateClient = p._get_destination_client(p.default_schema)
client = p._destination_client()
assert client.dataset_name is None
assert client.sentinel_class == "DltSentinelClass"
# also check trace
print(p.last_trace.steps[-1].step_info)
assert_class(p, "Content", expected_items_count=3)


@pytest.mark.skip(reason="skip to avoid race condition with other tests")
def test_vectorize_property_without_data() -> None:
# we request to vectorize "content" but property with this name does not appear in the data
# an incomplete column was created and it can't be created at destination
p = dlt.pipeline(destination="weaviate", full_refresh=True)
# check if we use localhost
client: WeaviateClient = p._destination_client()
if "localhost" not in client.config.credentials.url:
pytest.skip("skip to avoid race condition with other tests")

assert p.dataset_name is None
info = p.run(weaviate_adapter(["a", "b", "c"], vectorize=["content"]))
# dataset in load info is empty
Expand Down
11 changes: 9 additions & 2 deletions tests/load/weaviate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dlt
from dlt.common.pipeline import PipelineContext
from dlt.common.configuration.container import Container
from dlt.common.schema.utils import get_columns_names_with_prop

from dlt.destinations.weaviate.weaviate_client import WeaviateClient
from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT
Expand All @@ -22,6 +23,7 @@ def assert_class(
items: List[Any] = None,
) -> None:
client: WeaviateClient = pipeline._destination_client()
vectorizer_name: str = client._vectorizer_config["vectorizer"]

# Check if class exists
schema = client.get_class_schema(class_name)
Expand All @@ -35,14 +37,19 @@ def assert_class(
# make sure expected columns are vectorized
for column_name, column in columns.items():
prop = properties[column_name]
# text2vec-openai is the default
assert prop["moduleConfig"]["text2vec-openai"]["skip"] == (
assert prop["moduleConfig"][vectorizer_name]["skip"] == (
not column.get(VECTORIZE_HINT, False)
)
# tokenization
if TOKENIZATION_HINT in column:
assert prop["tokenization"] == column[TOKENIZATION_HINT]

# if there's a single vectorize hint, class must have vectorizer enabled
if get_columns_names_with_prop(pipeline.default_schema.get_table(class_name), VECTORIZE_HINT):
assert schema["vectorizer"] == vectorizer_name
else:
assert schema["vectorizer"] == "none"

# response = db_client.query.get(class_name, list(properties.keys())).do()
response = client.query_class(class_name, list(properties.keys())).do()
objects = response["data"]["Get"][client.make_full_name(class_name)]
Expand Down

0 comments on commit 784c1cf

Please sign in to comment.