Skip to content

Commit

Permalink
Fix Procedure level queue (#1946)
Browse files Browse the repository at this point in the history
Refs: #1898
Closes: #1945
PR-URL: #1946
  • Loading branch information
KLarpen authored Dec 19, 2023
1 parent f17652a commit fa4d0bb
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased][unreleased]

- Fixed API endpoints local queue settings applying

## [3.0.13][] - 2023-10-22

- Fix serve static not in cache (e.g. certbot challenge)
Expand Down
4 changes: 2 additions & 2 deletions lib/procedure.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Procedure {

async enter() {
await this.application.semaphore.enter();
if (this.concurrency) {
if (this.semaphore) {
try {
await this.semaphore.enter();
} catch (error) {
Expand All @@ -53,7 +53,7 @@ class Procedure {

leave() {
this.application.semaphore.leave();
if (this.concurrency) this.semaphore.leave();
if (this.semaphore) this.semaphore.leave();
}

async invoke(context, args = {}) {
Expand Down
69 changes: 69 additions & 0 deletions test/procedure.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,72 @@ metatests.testAsync('lib/procedure timeout', async (test) => {

await test.resolves(() => procedure.invoke({}, { waitTime: 50 }), DONE);
});

metatests.testAsync('lib/procedure queue', async (test) => {
const DONE = 'success';

const script = () => ({
queue: {
concurrency: 1,
size: 1,
timeout: 15,
},

method: async ({ waitTime }) =>
new Promise((resolve) => {
setTimeout(() => resolve(DONE), waitTime);
}),
});

const application = {
Error,
semaphore: {
async enter() {},
leave() {},
},
};

const rpc = async (proc, args) => {
let result = null;
await proc.enter();
try {
result = await proc.invoke({}, args);
} catch (error) {
throw new Error('Procedure.invoke failed. Check your script.method');
}
proc.leave();
return result;
};

const procedure = new Procedure(script, 'method', application);

await test.resolves(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 2 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[1];
return last.value;
}, DONE);

await test.rejects(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 16 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[1];
if (last.status === 'rejected') throw last.reason;
return last.value;
}, new Error('Semaphore timeout'));

await test.rejects(async () => {
const invokes = await Promise.allSettled([
rpc(procedure, { waitTime: 1 }),
rpc(procedure, { waitTime: 1 }),
rpc(procedure, { waitTime: 1 }),
]);
const last = invokes[2];
if (last.status === 'rejected') throw last.reason;
return last.value;
}, new Error('Semaphore queue is full'));
});

0 comments on commit fa4d0bb

Please sign in to comment.