Skip to content

Commit

Permalink
feat: enable vpc endpoints for each construct (#674)
Browse files Browse the repository at this point in the history
* feat: enable vpc endpoints for each construct

---------

Co-authored-by: Erdem Ayyildiz <[email protected]>
Co-authored-by: Alain Krok <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2024
1 parent cd9d77c commit 00367ad
Show file tree
Hide file tree
Showing 27 changed files with 4,028 additions and 7,728 deletions.
2 changes: 1 addition & 1 deletion apidocs/classes/QaAppsyncOpensearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ represents the scope for all the resources.

**id**: `string`

this is a a scope-unique id.
this is a scope-unique id.

**props**: [`QaAppsyncOpensearchProps`](../interfaces/QaAppsyncOpensearchProps.md)

Expand Down
2 changes: 1 addition & 1 deletion apidocs/interfaces/QaAppsyncOpensearchProps.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Existing Amazon OpenSearch Service domain.

> `readonly` `optional` **existingOpensearchServerlessCollection**: `CfnCollection`
Existing Amazon Amazon OpenSearch Serverless collection.
Existing Amazon OpenSearch Serverless collection.

#### Default

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

Creates default AmazonAuroraVectorStore.

It includes creation of a VPC with 3 subnets (public,
private with NAT Gateway, private without NAT Gateway),
It includes creation of a VPC with 1 subnets (private isolated),
with the Amazon Aurora Serverless V2 Cluster.
The cluster has 1 writer/reader of PostgreSQL version 15.5
instance (min capacity 0.5, max capacity 4). Lambda custom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ index of appropriate dimensions in the Aurora database.
The VPC where the Aurora Vector Store will be deployed in.
The provided VPC must have at least one subnet of type
`ec2.SubnetType.PUBLIC` and at least one subnet of type
`ec2.SubnetType.PRIVATE_WITH_EGRESS`. If no subnets of these
`ec2.SubnetType.PRIVATE_ISOLATED`. If no subnets of these
types are available, the deployment will fail.
If not provided, a new VPC with the required subnet
configuration will be created automatically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@
import os
import time

from .cr_types import CustomResourceRequest, CustomResourceResponse
from .opensearch_index import on_event as on_event_opensearch_index

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)

try:
from .cr_types import CustomResourceRequest, CustomResourceResponse
from .opensearch_index import on_event as on_event_opensearch_index
from .opensearch_vpc_endpoint import on_event as on_event_opensearch_vpc_endpoint
except ImportError as e:
logger.error(f"Error importing modules: {e}")
logger.exception(e)


def on_event(event: CustomResourceRequest, context):
logger.debug(f"Received event: {event}")
resource_type = event["ResourceType"]

if resource_type == "Custom::OpenSearchIndex":
return on_event_opensearch_index(event, context)
if resource_type == "Custom::VpcEndpoint":
return on_event_opensearch_vpc_endpoint(event, context)
if resource_type == "Custom::NoOp":
logger.info("NoOp resource type")
# Return a response with a physical resource ID that is not empty.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
# and limitations under the License.
#

from opensearchpy import (OpenSearch, AuthorizationException)

import boto3
import logging
import os
import uuid

from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)

from typing import List, TypedDict

from custom_resources.cr_types import CustomResourceRequest, CustomResourceResponse
from opensearch_index import connect_opensearch

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)


class VpcEndpointProperties(TypedDict):
Endpoint: str
DomainArn: str
SubnetIds: List[str]
SecurityGroupIds: List[str]

def validate_event(event: CustomResourceRequest[VpcEndpointProperties]) -> bool:
if event["ResourceProperties"] is None:
raise ValueError("ResourceProperties is required")
if event["ResourceProperties"]["Endpoint"] is None:
raise ValueError("Endpoint is required")
if event["ResourceProperties"]["DomainArn"] is None:
raise ValueError("DomainArn is required")
if event["ResourceProperties"]["SubnetIds"] is None:
raise ValueError("SubnetIds is required")
if event["ResourceProperties"]["SecurityGroupIds"] is None:
raise ValueError("SecurityGroupIds is required")

@retry(
retry=retry_if_exception_type(AuthorizationException),
stop=stop_after_attempt(30),
wait=wait_exponential_jitter(1, 3),
)
def handle_create(
client: OpenSearch,
domain_arn: str,
subnet_ids: List[str],
security_group_ids: List[str],
client_token: str
):
try:
response = client.create_vpc_endpoint(
DomainArn= domain_arn,
VpcOptions={
"SubnetIds": subnet_ids,
"SecurityGroupIds": security_group_ids,
},
ClientToken=client_token,
)
except Exception as e:
logger.error(f"Error creating VPC endpoint for domain: {domain_arn}")
logger.exception(e)
raise e
return response["VpcEndpoint"]["VpcEndpointId"]

@retry(
retry=retry_if_exception_type(AuthorizationException),
stop=stop_after_attempt(30),
wait=wait_exponential_jitter(1, 3),
)
def handle_update(
client: OpenSearch,
vpc_endpoint_id: str,
subnet_ids: List[str],
security_group_ids: List[str]
):
try:
response = client.update_vpc_endpoint(
VpcEndpointId=vpc_endpoint_id,
VpcOptions={
"SubnetIds": subnet_ids,
"SecurityGroupIds": security_group_ids,
},
)
except Exception as e:
logger.error(f"Error updating VPC endpoint: {vpc_endpoint_id}")
logger.exception(e)
raise e
return response["VpcEndpoint"]["VpcEndpointId"]

@retry(
retry=retry_if_exception_type(AuthorizationException),
stop=stop_after_attempt(30),
wait=wait_exponential_jitter(1, 3),
)
def handle_delete(
client: OpenSearch,
vpc_endpoint_id: str,
):
try:
response = client.delete_vpc_endpoint(
VpcEndpointId=vpc_endpoint_id,
)
except Exception as e:
logger.error(f"Error deleting VPC endpoint: {vpc_endpoint_id}")
logger.exception(e)
raise e
return response["VpcEndpointSummary"]["VpcEndpointId"]

def on_create(event: CustomResourceRequest[VpcEndpointProperties]) -> CustomResourceResponse:
validate_event(event)
client = connect_opensearch(event["ResourceProperties"]["Endpoint"])
physical_id = handle_create(client,
event["ResourceProperties"]["DomainArn"],
event["ResourceProperties"]["SubnetIds"],
event["ResourceProperties"]["SecurityGroupIds"],
str(uuid.uuid4())
)
return {"PhysicalResourceId": physical_id}

def on_update(
event: CustomResourceRequest[VpcEndpointProperties],
) -> CustomResourceResponse:
validate_event(event)
client = connect_opensearch(event["ResourceProperties"]["Endpoint"])
physical_id = handle_update(client,
event["PhysicalResourceId"],
event["ResourceProperties"]["SubnetIds"],
event["ResourceProperties"]["SecurityGroupIds"]
)
return {"PhysicalResourceId": physical_id}
def on_delete(
event: CustomResourceRequest[VpcEndpointProperties],
) -> CustomResourceResponse:
validate_event(event)
client = connect_opensearch(event["ResourceProperties"]["Endpoint"])
pyhiscal_id = handle_delete(client, event["PhysicalResourceId"])

return {"PhysicalResourceId": pyhiscal_id}


def on_event(event, context):
logger.info(f"event: {event}")
request_type = event["RequestType"]
if request_type == "Create":
return on_create(event, context)
if request_type == "Update":
return on_update(event, context)
if request_type == "Delete":
return on_delete(event, context)
raise Exception("Invalid request type: %s" % request_type)
Loading

0 comments on commit 00367ad

Please sign in to comment.