Skip to content

Commit

Permalink
feat(email): store emails to db/storage (#1091)
Browse files Browse the repository at this point in the history
* feat: store emails

feat: store emails in database or storage & cleanup job

feat: resend email

chore: remove comments

* fix: revert yarn.lock

* fix: found bugs during testing

* fix: fix emails stored in storage & resend bugs

* fix: bugs related to messageId & email status

* refactor: double if

* refactor: discussed changes

* refactor: util import

* refactor: discussed changes

* refactor: remove populate

* refactor: resend using emailRecordId instead of messageId
  • Loading branch information
ChrisPdgn authored Jul 24, 2024
1 parent b407dbe commit 3e2792c
Show file tree
Hide file tree
Showing 20 changed files with 576 additions and 17 deletions.
12 changes: 12 additions & 0 deletions libraries/grpc-sdk/src/modules/email/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ export class Email extends ConduitModule<typeof EmailDefinition> {
return res.sentMessageInfo;
});
}

resendEmail(emailRecordId: string) {
return this.client!.resendEmail({ emailRecordId }).then(res => {
return res.sentMessageInfo;
});
}

getEmailStatus(messageId: string) {
return this.client!.getEmailStatus({ messageId }).then(res => {
return JSON.parse(res.statusInfo);
});
}
}
2 changes: 2 additions & 0 deletions modules/email/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"@sendgrid/client": "^8.0.0",
"@types/nodemailer-sendgrid": "^1.0.3",
"await-to-js": "^3.0.0",
"axios": "^1.7.2",
"bullmq": "^5.4.3",
"convict": "^6.2.4",
"escape-string-regexp": "^4.0.0",
"handlebars": "^4.7.8",
Expand Down
98 changes: 97 additions & 1 deletion modules/email/src/Email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@ import { isNil } from 'lodash-es';
import { status } from '@grpc/grpc-js';
import { runMigrations } from './migrations/index.js';
import {
GetEmailStatusRequest,
GetEmailStatusResponse,
RegisterTemplateRequest,
RegisterTemplateResponse,
ResendEmailRequest,
ResendEmailResponse,
SendEmailRequest,
SendEmailResponse,
} from './protoTypes/email.js';
import metricsSchema from './metrics/index.js';
import { ConfigController, ManagedModule } from '@conduitplatform/module-tools';
import { ISendEmailParams } from './interfaces/index.js';
import { fileURLToPath } from 'node:url';
import { Queue, Worker } from 'bullmq';
import { Cluster, Redis } from 'ioredis';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

Expand All @@ -35,6 +41,8 @@ export default class Email extends ManagedModule<Config> {
functions: {
registerTemplate: this.registerTemplate.bind(this),
sendEmail: this.sendEmail.bind(this),
resendEmail: this.resendEmail.bind(this),
getEmailStatus: this.getEmailStatus.bind(this),
},
};
protected metricsSchema = metricsSchema;
Expand All @@ -43,6 +51,8 @@ export default class Email extends ManagedModule<Config> {
private database: DatabaseProvider;
private emailProvider: EmailProvider;
private emailService: EmailService;
private redisConnection: Redis | Cluster;
private emailCleanupQueue: Queue | null = null;

constructor() {
super('email');
Expand All @@ -54,6 +64,7 @@ export default class Email extends ManagedModule<Config> {
this.database = this.grpcSdk.database!;
await this.registerSchemas();
await runMigrations(this.grpcSdk);
this.redisConnection = this.grpcSdk.redisManager.getClient();
}

async preConfig(config: Config) {
Expand All @@ -73,7 +84,7 @@ export default class Email extends ManagedModule<Config> {
} else {
if (!this.isRunning) {
await this.initEmailProvider();
this.emailService = new EmailService(this.emailProvider);
this.emailService = new EmailService(this.grpcSdk, this.emailProvider);
this.adminRouter = new AdminHandlers(this.grpcServer, this.grpcSdk);
this.adminRouter.setEmailService(this.emailService);
this.isRunning = true;
Expand All @@ -82,6 +93,14 @@ export default class Email extends ManagedModule<Config> {
this.emailService.updateProvider(this.emailProvider);
}
this.updateHealth(HealthCheckStatus.SERVING);

const config = ConfigController.getInstance().config as Config;
if (config.storeEmails.storage.enabled && !this.grpcSdk.isAvailable('storage')) {
ConduitGrpcSdk.Logger.warn(
'Failed to enable email storing. Storage module not serving.',
);
}
await this.handleEmailCleanupJob(config);
}
}

Expand Down Expand Up @@ -139,6 +158,32 @@ export default class Email extends ManagedModule<Config> {
return callback(null, { sentMessageInfo });
}

async resendEmail(
call: GrpcRequest<ResendEmailRequest>,
callback: GrpcCallback<ResendEmailResponse>,
) {
let errorMessage: string | null = null;
const sentMessageInfo = await this.emailService
.resendEmail(call.request.emailRecordId)
.catch((e: Error) => (errorMessage = e.message));
if (!isNil(errorMessage))
return callback({ code: status.INTERNAL, message: errorMessage });
return callback(null, { sentMessageInfo });
}

async getEmailStatus(
call: GrpcRequest<GetEmailStatusRequest>,
callback: GrpcCallback<GetEmailStatusResponse>,
) {
let errorMessage: string | null = null;
const statusInfo = await this.emailService
.getEmailStatus(call.request.messageId)
.catch((e: Error) => (errorMessage = e.message));
if (!isNil(errorMessage))
return callback({ code: status.INTERNAL, message: errorMessage });
return callback(null, { statusInfo: JSON.stringify(statusInfo) });
}

protected registerSchemas(): Promise<unknown> {
const promises = Object.values(models).map(model => {
const modelInstance = model.getInstance(this.database);
Expand All @@ -158,4 +203,55 @@ export default class Email extends ManagedModule<Config> {

this.emailProvider = new EmailProvider(transport, transportSettings);
}

private async handleEmailCleanupJob(config: Config) {
this.emailCleanupQueue = new Queue('email-cleanup-queue', {
connection: this.redisConnection,
});
await this.emailCleanupQueue.drain(true);
if (!config.storeEmails.enabled || !config.storeEmails.cleanupSettings.enabled) {
await this.emailCleanupQueue.close();
return;
}
const processorFile = path.normalize(
path.join(__dirname, 'jobs', 'cleanupStoredEmails.js'),
);
const worker = new Worker('email-cleanup-queue', processorFile, {
connection: this.redisConnection,
removeOnComplete: {
age: 3600,
count: 1000,
},
removeOnFail: {
age: 24 * 3600,
},
});
worker.on('active', job => {
ConduitGrpcSdk.Logger.info(`Stored email cleanup job ${job.id} started`);
});
worker.on('completed', () => {
ConduitGrpcSdk.Logger.info(`Stored email cleanup completed`);
});
worker.on('error', (error: Error) => {
ConduitGrpcSdk.Logger.error(`Stored email cleanup error:`);
ConduitGrpcSdk.Logger.error(error);
});

worker.on('failed', (_job, error) => {
ConduitGrpcSdk.Logger.error(`Stored email cleanup error:`);
ConduitGrpcSdk.Logger.error(error);
});
await this.emailCleanupQueue.add(
'cleanup',
{
limit: config.storeEmails.cleanupSettings.limit,
deleteStorageFiles: config.storeEmails.storage.enabled,
},
{
repeat: {
every: config.storeEmails.cleanupSettings.repeat,
},
},
);
}
}
108 changes: 105 additions & 3 deletions modules/email/src/admin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '@conduitplatform/grpc-sdk';
import {
ConduitBoolean,
ConduitDate,
ConduitJson,
ConduitNumber,
ConduitString,
Expand All @@ -21,7 +22,7 @@ import { to } from 'await-to-js';
import { isNil } from 'lodash-es';
import { getHandleBarsValues } from '../email-provider/utils/index.js';
import { EmailService } from '../services/email.service.js';
import { EmailTemplate } from '../models/index.js';
import { EmailRecord, EmailTemplate } from '../models/index.js';
import { Config } from '../config/index.js';
import { Template } from '../email-provider/interfaces/Template.js';
import { TemplateDocument } from '../email-provider/interfaces/TemplateDocument.js';
Expand Down Expand Up @@ -364,7 +365,7 @@ export class AdminHandlers {
}
}

await this.emailService
const sentEmailInfo = await this.emailService
.sendEmail(templateName, {
body,
subject,
Expand All @@ -379,7 +380,55 @@ export class AdminHandlers {
throw new GrpcError(status.INTERNAL, e.message);
});
ConduitGrpcSdk.Metrics?.increment('emails_sent_total');
return { message: 'Email sent' };
return { message: sentEmailInfo.messageId ?? 'Email sent' };
}

async resendEmail(call: ParsedRouterRequest): Promise<UnparsedRouterResponse> {
return (await this.emailService.resendEmail(
call.request.params.emailRecordId,
)) as UnparsedRouterResponse;
}

async getEmailStatus(call: ParsedRouterRequest): Promise<UnparsedRouterResponse> {
const statusInfo = await this.emailService.getEmailStatus(
call.request.params.messageId,
);
return { statusInfo };
}

async getEmailRecords(call: ParsedRouterRequest): Promise<UnparsedRouterResponse> {
const {
messageId,
templateId,
receiver,
sender,
cc,
replyTo,
startDate,
endDate,
skip,
limit,
sort,
} = call.request.params;
const query: Query<EmailRecord> = {
...(messageId ? { messageId } : {}),
...(templateId ? { templateId } : {}),
...(receiver ? { receiver } : {}),
...(sender ? { sender } : {}),
...(cc ? { cc: { $in: cc } } : {}),
...(replyTo ? { replyTo } : {}),
...(startDate ? { createdAt: { $gte: startDate } } : {}),
...(endDate ? { createdAt: { $lte: endDate } } : {}),
};
const records = await EmailRecord.getInstance().findMany(
query,
undefined,
skip,
limit,
sort,
);
const count = await EmailRecord.getInstance().countDocuments(query);
return { records, count };
}

private registerAdminRoutes() {
Expand Down Expand Up @@ -530,6 +579,59 @@ export class AdminHandlers {
}),
this.sendEmail.bind(this),
);
this.routingManager.route(
{
path: '/resend',
action: ConduitRouteActions.POST,
description: `Resends an email (only if stored in storage).`,
bodyParams: {
emailRecordId: ConduitString.Required,
},
},
new ConduitRouteReturnDefinition('Resend an email', {
message: ConduitString.Required,
}),
this.resendEmail.bind(this),
);
this.routingManager.route(
{
path: '/status',
action: ConduitRouteActions.GET,
description: `Returns the latest status of a sent email.`,
queryParams: {
messageId: ConduitString.Required,
},
},
new ConduitRouteReturnDefinition('GetEmailStatus', {
statusInfo: ConduitJson.Required,
}),
this.getEmailStatus.bind(this),
);
this.routingManager.route(
{
path: '/record',
action: ConduitRouteActions.GET,
description: `Returns records of stored sent emails.`,
queryParams: {
messageId: ConduitString.Optional,
templateId: ConduitString.Optional,
receiver: ConduitString.Optional,
sender: ConduitString.Optional,
cc: [ConduitString.Optional],
replyTo: ConduitString.Optional,
startDate: ConduitDate.Optional,
endDate: ConduitDate.Optional,
skip: ConduitNumber.Optional,
limit: ConduitNumber.Optional,
sort: ConduitString.Optional,
},
},
new ConduitRouteReturnDefinition('GetEmailRecords', {
records: [EmailRecord.name],
count: ConduitNumber.Required,
}),
this.getEmailRecords.bind(this),
);
this.routingManager.registerRoutes();
}
}
41 changes: 41 additions & 0 deletions modules/email/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,45 @@ export default {
},
},
},
storeEmails: {
enabled: {
doc: 'Defines if sent email info should be stored in database',
format: 'Boolean',
default: false,
},
storage: {
enabled: {
doc: 'Defines if email content should be stored in storage',
format: 'Boolean',
default: false,
},
container: {
doc: 'The storage container for emails',
format: 'String',
default: 'conduit',
},
folder: {
doc: 'The storage folder for emails',
format: 'String',
default: 'cnd-stored-emails',
},
},
cleanupSettings: {
enabled: {
doc: 'Settings for deleting old stored emails',
format: 'Boolean',
default: false,
},
repeat: {
doc: 'Time in milliseconds to repeat the cleanup job',
format: 'Number',
default: 6 * 60 * 60 * 1000,
},
limit: {
doc: 'Amount of stored emails to be deleted upon cleanup',
format: 'Number',
default: 100,
},
},
},
};
8 changes: 8 additions & 0 deletions modules/email/src/email-provider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { MandrillProvider } from './transports/mandrill/MandrilProvider.js';
import { SendgridProvider } from './transports/sendgrid/SendgridProvider.js';
import { SmtpProvider } from './transports/smtp/SmtpProvider.js';
import { ConfigController } from '@conduitplatform/module-tools';
import { Indexable } from '@conduitplatform/grpc-sdk';

export class EmailProvider {
_transport?: EmailProviderClass;
Expand Down Expand Up @@ -90,4 +91,11 @@ export class EmailProvider {
}
return this._transport.sendEmail(email.getMailObject());
}

getEmailStatus(messageId: string): Promise<Indexable> {
if (!this._transport) {
throw new Error('Email transport not initialized!');
}
return this._transport.getEmailStatus(messageId);
}
}
Loading

0 comments on commit 3e2792c

Please sign in to comment.