Skip to content

Commit

Permalink
unique destinationPush
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderkirtzel committed Dec 18, 2024
1 parent cb699a7 commit d44314b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 59 deletions.
16 changes: 0 additions & 16 deletions packages/sources/node/src/__tests__/destination.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,20 +433,4 @@ describe('Destination', () => {
// timing: 0, // @TODO should be set to default type
});
});

test('loop prevention', async () => {
mockDestination.type = 'foo';
const { elb } = getSource({
destinations: { mockDestination },
});

await elb({ event: 'e a' });
await elb({ event: 'e a', source: { type: 'another one' } });
expect(mockPush).toHaveBeenCalledTimes(2);

jest.clearAllMocks();
await elb({ event: 'e a', source: { type: 'foo' } });

expect(mockPush).toHaveBeenCalledTimes(0);
});
});
6 changes: 1 addition & 5 deletions packages/sources/node/src/lib/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ export async function destinationPush(
}
}

// Prevent recursive loops between sources and destinations
if (isDefined(destination.type) && destination.type === event.source.type)
return false;

const options = { data, instance };

if (eventMapping?.batch && destination.pushBatch) {
Expand All @@ -120,7 +116,7 @@ export async function destinationPush(

eventMapping.batchFn =
eventMapping.batchFn ||
debounce(async (destination, instance) => {
debounce((destination, instance) => {
useHooks(
destination.pushBatch!,
'DestinationPushBatch',
Expand Down
73 changes: 35 additions & 38 deletions packages/sources/walkerjs/src/lib/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
debounce,
getMappingEvent,
getId,
tryCatch,
useHooks,
getMappingValue,
isDefined,
Expand Down Expand Up @@ -133,42 +132,40 @@ export function destinationPush(

const options = { data, instance };

return !!tryCatch(() => {
if (eventMapping?.batch && destination.pushBatch) {
const batched = eventMapping.batched || {
key: mappingKey || '',
events: [],
data: [],
};
batched.events.push(event);
if (isDefined(data)) batched.data.push(data);

eventMapping.batchFn =
eventMapping.batchFn ||
debounce((destination, instance) => {
useHooks(
destination.pushBatch!,
'DestinationPushBatch',
instance.hooks,
)(batched, destination.config, options);

// Reset the batched queues
batched.events = [];
batched.data = [];
}, eventMapping.batch);

eventMapping.batched = batched;
eventMapping.batchFn(destination, instance);
} else {
// It's time to go to the destination's side now
useHooks(destination.push, 'DestinationPush', instance.hooks)(
event,
destination.config,
eventMapping,
options,
);
}
if (eventMapping?.batch && destination.pushBatch) {
const batched = eventMapping.batched || {
key: mappingKey || '',
events: [],
data: [],
};
batched.events.push(event);
if (isDefined(data)) batched.data.push(data);

eventMapping.batchFn =
eventMapping.batchFn ||
debounce((destination, instance) => {
useHooks(
destination.pushBatch!,
'DestinationPushBatch',
instance.hooks,
)(batched, destination.config, options);

// Reset the batched queues
batched.events = [];
batched.data = [];
}, eventMapping.batch);

eventMapping.batched = batched;
eventMapping.batchFn(destination, instance);
} else {
// It's time to go to the destination's side now
useHooks(destination.push, 'DestinationPush', instance.hooks)(
event,
destination.config,
eventMapping,
options,
);
}

return true;
})();
return true;
}

0 comments on commit d44314b

Please sign in to comment.