From 13346f1f4e6e41b7c74b126d9a75ae7a28e6e9df Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Thu, 19 Sep 2024 12:52:35 +0300 Subject: [PATCH] refactor: introduce grpc client params to enable LB, fix state storing inconsistencies. (#1157) * fix(authentication, authorization): teamMembers count, remove default pagination from rpc routes (#1161) * fix(authentication): count members users instead of relations * fix(authorization): remove rpc default limit * fix(admin,core,router): state publishing inconsistency * feat(grpc-sdk,hermes): round robin lb for grpc connections --------- Co-authored-by: Rena Tsagka <57152951+Renc17@users.noreply.github.com> --- .../grpc-sdk/src/classes/ConduitModule.ts | 2 ++ libraries/hermes/src/utils/GrpcConverter.ts | 1 + modules/authentication/src/utils/index.ts | 2 +- .../src/controllers/relations.controller.ts | 4 ++-- modules/router/src/Router.ts | 21 ++++++++++++++----- packages/admin/src/index.ts | 4 ++-- .../config-manager/service-discovery/index.ts | 4 ++-- 7 files changed, 26 insertions(+), 12 deletions(-) diff --git a/libraries/grpc-sdk/src/classes/ConduitModule.ts b/libraries/grpc-sdk/src/classes/ConduitModule.ts index d8b6e7807..858b62e9d 100644 --- a/libraries/grpc-sdk/src/classes/ConduitModule.ts +++ b/libraries/grpc-sdk/src/classes/ConduitModule.ts @@ -73,7 +73,9 @@ export class ConduitModule { this.channel = createChannel(this._serviceUrl, undefined, { 'grpc.max_receive_message_length': 1024 * 1024 * 100, 'grpc.max_send_message_length': 1024 * 1024 * 100, + 'grpc.service_config': '{"loadBalancingConfig":[{"round_robin":{}}]}', }); + let clientFactory = createClientFactory() .use( this._grpcToken diff --git a/libraries/hermes/src/utils/GrpcConverter.ts b/libraries/hermes/src/utils/GrpcConverter.ts index ec357f838..ea710a309 100644 --- a/libraries/hermes/src/utils/GrpcConverter.ts +++ b/libraries/hermes/src/utils/GrpcConverter.ts @@ -67,6 +67,7 @@ export function grpcToConduitRoute( { 'grpc.max_receive_message_length': 1024 * 1024 * 100, 'grpc.max_send_message_length': 1024 * 1024 * 100, + 'grpc.service_config': '{"loadBalancingConfig":[{"round_robin":{}}]}', }, ); diff --git a/modules/authentication/src/utils/index.ts b/modules/authentication/src/utils/index.ts index 0a68df0ef..84c377a85 100644 --- a/modules/authentication/src/utils/index.ts +++ b/modules/authentication/src/utils/index.ts @@ -160,7 +160,6 @@ export namespace AuthUtils { } } - const count = relations.relations.length; const members: (User & { role?: string })[] = await User.getInstance().findMany( query, undefined, @@ -169,6 +168,7 @@ export namespace AuthUtils { sort, populate, ); + const count = await User.getInstance().countDocuments(query); members.forEach(member => { // add role from relation to each member // find relation with member id diff --git a/modules/authorization/src/controllers/relations.controller.ts b/modules/authorization/src/controllers/relations.controller.ts index d316704e9..38f7aaeba 100644 --- a/modules/authorization/src/controllers/relations.controller.ts +++ b/modules/authorization/src/controllers/relations.controller.ts @@ -193,8 +193,8 @@ export class RelationsController { resourceType?: string; subjectType?: string; }, - skip: number = 0, - limit = 10, + skip?: number, + limit?: number, ) { const query: { subject?: string; diff --git a/modules/router/src/Router.ts b/modules/router/src/Router.ts index d4f647d0a..c2b1d578c 100644 --- a/modules/router/src/Router.ts +++ b/modules/router/src/Router.ts @@ -44,6 +44,7 @@ import * as adminRoutes from './admin/routes/index.js'; import metricsSchema from './metrics/index.js'; import { ConfigController, ManagedModule } from '@conduitplatform/module-tools'; import { fileURLToPath } from 'node:url'; + const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -161,9 +162,9 @@ export default class ConduitDefaultRouter extends ManagedModule { const messageParsed = JSON.parse(message); try { this.internalRegisterRoute( - messageParsed.protofile, messageParsed.routes, messageParsed.url, + messageParsed.moduleName, ); } catch (err) { ConduitGrpcSdk.Logger.error(err as Error); @@ -172,7 +173,11 @@ export default class ConduitDefaultRouter extends ManagedModule { this.scheduleMiddlewareApply(); } - updateState(routes: RegisterConduitRouteRequest_PathDefinition[], url: string) { + updateState( + routes: RegisterConduitRouteRequest_PathDefinition[], + url: string, + moduleName: string, + ) { this.grpcSdk .state!.modifyState(async (existingState: Indexable) => { const state = existingState ?? {}; @@ -189,12 +194,13 @@ export default class ConduitDefaultRouter extends ManagedModule { state.routes.push({ routes, url, + moduleName, }); } return state; }) .then(() => { - this.publishRouteData(routes, url); + this.publishRouteData(routes, url, moduleName); ConduitGrpcSdk.Logger.log('Updated routes state'); }) .catch(e => { @@ -203,12 +209,17 @@ export default class ConduitDefaultRouter extends ManagedModule { }); } - publishRouteData(routes: RegisterConduitRouteRequest_PathDefinition[], url: string) { + publishRouteData( + routes: RegisterConduitRouteRequest_PathDefinition[], + url: string, + moduleName: string, + ) { this.grpcSdk.bus!.publish( 'router', JSON.stringify({ routes, url, + moduleName, }), ); } @@ -253,7 +264,7 @@ export default class ConduitDefaultRouter extends ManagedModule { moduleName as string, ); - this.updateState(call.request.routes, call.request.routerUrl); + this.updateState(call.request.routes, call.request.routerUrl, moduleName as string); this.scheduleMiddlewareApply(); } catch (err) { ConduitGrpcSdk.Logger.error(err as Error); diff --git a/packages/admin/src/index.ts b/packages/admin/src/index.ts index 1a6aec727..11cc2e62d 100644 --- a/packages/admin/src/index.ts +++ b/packages/admin/src/index.ts @@ -383,7 +383,7 @@ export default class AdminModule extends IConduitAdmin { } }); } - return this.internalRegisterRoute(r.protofile, r.routes, r.url); + return this.internalRegisterRoute(r.routes, r.url); } catch (err) { ConduitGrpcSdk.Logger.error(err as Error); } @@ -418,9 +418,9 @@ export default class AdminModule extends IConduitAdmin { const messageParsed = JSON.parse(message); try { this.internalRegisterRoute( - messageParsed.protofile, messageParsed.routes, messageParsed.url, + messageParsed.moduleName, ); } catch (err) { ConduitGrpcSdk.Logger.error(err as Error); diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index 029c9261b..9ed5a2226 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -77,7 +77,7 @@ export class ServiceDiscovery { }); } } - this.grpcSdk.bus!.subscribe('config', (message: string) => { + this.grpcSdk.bus!.subscribe('service-discover', (message: string) => { const parsedMessage = JSON.parse(message); if (parsedMessage.type === 'module-health') { this._serviceMonitor.updateModuleHealth( @@ -233,7 +233,7 @@ export class ServiceDiscovery { status?: HealthCheckStatus, ) { this.grpcSdk.bus!.publish( - 'config', + 'service-discover', JSON.stringify({ type, name,