Skip to content

Commit

Permalink
refactor: introduce grpc client params to enable LB, fix state storin…
Browse files Browse the repository at this point in the history
…g inconsistencies. (#1157)

* fix(authentication, authorization): teamMembers count, remove default pagination from rpc routes (#1161)

* fix(authentication): count members users instead of relations

* fix(authorization): remove rpc default limit

* fix(admin,core,router): state publishing inconsistency

* feat(grpc-sdk,hermes): round robin lb for grpc connections

---------

Co-authored-by: Rena Tsagka <[email protected]>
  • Loading branch information
kkopanidis and Renc17 authored Sep 19, 2024
1 parent 1f50ca6 commit 13346f1
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 12 deletions.
2 changes: 2 additions & 0 deletions libraries/grpc-sdk/src/classes/ConduitModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ export class ConduitModule<T extends CompatServiceDefinition> {
this.channel = createChannel(this._serviceUrl, undefined, {
'grpc.max_receive_message_length': 1024 * 1024 * 100,
'grpc.max_send_message_length': 1024 * 1024 * 100,
'grpc.service_config': '{"loadBalancingConfig":[{"round_robin":{}}]}',
});

let clientFactory = createClientFactory()
.use(
this._grpcToken
Expand Down
1 change: 1 addition & 0 deletions libraries/hermes/src/utils/GrpcConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export function grpcToConduitRoute(
{
'grpc.max_receive_message_length': 1024 * 1024 * 100,
'grpc.max_send_message_length': 1024 * 1024 * 100,
'grpc.service_config': '{"loadBalancingConfig":[{"round_robin":{}}]}',
},
);

Expand Down
2 changes: 1 addition & 1 deletion modules/authentication/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ export namespace AuthUtils {
}
}

const count = relations.relations.length;
const members: (User & { role?: string })[] = await User.getInstance().findMany(
query,
undefined,
Expand All @@ -169,6 +168,7 @@ export namespace AuthUtils {
sort,
populate,
);
const count = await User.getInstance().countDocuments(query);
members.forEach(member => {
// add role from relation to each member
// find relation with member id
Expand Down
4 changes: 2 additions & 2 deletions modules/authorization/src/controllers/relations.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ export class RelationsController {
resourceType?: string;
subjectType?: string;
},
skip: number = 0,
limit = 10,
skip?: number,
limit?: number,
) {
const query: {
subject?: string;
Expand Down
21 changes: 16 additions & 5 deletions modules/router/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import * as adminRoutes from './admin/routes/index.js';
import metricsSchema from './metrics/index.js';
import { ConfigController, ManagedModule } from '@conduitplatform/module-tools';
import { fileURLToPath } from 'node:url';

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

Expand Down Expand Up @@ -161,9 +162,9 @@ export default class ConduitDefaultRouter extends ManagedModule<Config> {
const messageParsed = JSON.parse(message);
try {
this.internalRegisterRoute(
messageParsed.protofile,
messageParsed.routes,
messageParsed.url,
messageParsed.moduleName,
);
} catch (err) {
ConduitGrpcSdk.Logger.error(err as Error);
Expand All @@ -172,7 +173,11 @@ export default class ConduitDefaultRouter extends ManagedModule<Config> {
this.scheduleMiddlewareApply();
}

updateState(routes: RegisterConduitRouteRequest_PathDefinition[], url: string) {
updateState(
routes: RegisterConduitRouteRequest_PathDefinition[],
url: string,
moduleName: string,
) {
this.grpcSdk
.state!.modifyState(async (existingState: Indexable) => {
const state = existingState ?? {};
Expand All @@ -189,12 +194,13 @@ export default class ConduitDefaultRouter extends ManagedModule<Config> {
state.routes.push({
routes,
url,
moduleName,
});
}
return state;
})
.then(() => {
this.publishRouteData(routes, url);
this.publishRouteData(routes, url, moduleName);
ConduitGrpcSdk.Logger.log('Updated routes state');
})
.catch(e => {
Expand All @@ -203,12 +209,17 @@ export default class ConduitDefaultRouter extends ManagedModule<Config> {
});
}

publishRouteData(routes: RegisterConduitRouteRequest_PathDefinition[], url: string) {
publishRouteData(
routes: RegisterConduitRouteRequest_PathDefinition[],
url: string,
moduleName: string,
) {
this.grpcSdk.bus!.publish(
'router',
JSON.stringify({
routes,
url,
moduleName,
}),
);
}
Expand Down Expand Up @@ -253,7 +264,7 @@ export default class ConduitDefaultRouter extends ManagedModule<Config> {
moduleName as string,
);

this.updateState(call.request.routes, call.request.routerUrl);
this.updateState(call.request.routes, call.request.routerUrl, moduleName as string);
this.scheduleMiddlewareApply();
} catch (err) {
ConduitGrpcSdk.Logger.error(err as Error);
Expand Down
4 changes: 2 additions & 2 deletions packages/admin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ export default class AdminModule extends IConduitAdmin {
}
});
}
return this.internalRegisterRoute(r.protofile, r.routes, r.url);
return this.internalRegisterRoute(r.routes, r.url);
} catch (err) {
ConduitGrpcSdk.Logger.error(err as Error);
}
Expand Down Expand Up @@ -418,9 +418,9 @@ export default class AdminModule extends IConduitAdmin {
const messageParsed = JSON.parse(message);
try {
this.internalRegisterRoute(
messageParsed.protofile,
messageParsed.routes,
messageParsed.url,
messageParsed.moduleName,
);
} catch (err) {
ConduitGrpcSdk.Logger.error(err as Error);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/config-manager/service-discovery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class ServiceDiscovery {
});
}
}
this.grpcSdk.bus!.subscribe('config', (message: string) => {
this.grpcSdk.bus!.subscribe('service-discover', (message: string) => {
const parsedMessage = JSON.parse(message);
if (parsedMessage.type === 'module-health') {
this._serviceMonitor.updateModuleHealth(
Expand Down Expand Up @@ -233,7 +233,7 @@ export class ServiceDiscovery {
status?: HealthCheckStatus,
) {
this.grpcSdk.bus!.publish(
'config',
'service-discover',
JSON.stringify({
type,
name,
Expand Down

0 comments on commit 13346f1

Please sign in to comment.