Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overwriting of cached task outputs for a single execution #minor #616

Merged
merged 5 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.env
*.swp
# direnv
.envrc

# C extensions
*.so
Expand Down
9 changes: 6 additions & 3 deletions packages/zapp/console/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ if (env.ADMIN_API_USE_SSL === 'https') {
});
}

process.on('SIGTERM', () => {
console.info('SIGTERM signal received. Shutting down.');
function shutdown(signal) {
console.info(`${signal} signal received. Shutting down.`);
server.close((error) => {
if (error) {
console.error('Failed to close server:', error);
Expand All @@ -74,4 +74,7 @@ process.on('SIGTERM', () => {
console.log('Server closed');
process.exit(0);
});
});
}

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
16 changes: 12 additions & 4 deletions packages/zapp/console/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ export function isValidDate(input: string | Date): boolean {
/** Converts a Protobuf Timestamp object to a JS Date */
export function timestampToDate(timestamp: Protobuf.ITimestamp): Date {
const nanos = timestamp.nanos || 0;
const milliseconds = (timestamp.seconds as Long).toNumber() * 1000 + nanos / 1e6;
const seconds =
typeof timestamp.seconds === 'number'
? timestamp.seconds
: (timestamp.seconds as Long).toNumber();
const milliseconds = seconds * 1000 + nanos / 1e6;
return new Date(milliseconds);
}

/** A sort comparison function for ordering timestamps in ascending progression */
export function compareTimestampsAscending(a: Protobuf.ITimestamp, b: Protobuf.ITimestamp) {
const leftSeconds: Long = a.seconds || Long.fromNumber(0);
const leftSeconds: Long =
(typeof a.seconds === 'number' ? Long.fromNumber(a.seconds) : a.seconds) || Long.fromNumber(0);
const leftNanos: number = a.nanos || 0;
const rightSeconds: Long = b.seconds || Long.fromNumber(0);
const rightSeconds: Long =
(typeof b.seconds === 'number' ? Long.fromNumber(b.seconds) : b.seconds) || Long.fromNumber(0);
const rightNanos: number = b.nanos || 0;
if (leftSeconds.eq(rightSeconds)) {
return leftNanos - rightNanos;
Expand All @@ -44,7 +50,9 @@ export function dateToTimestamp(date: Date): Protobuf.Timestamp {
/** Converts a Protobuf Duration object to its equivalent value in milliseconds */
export function durationToMilliseconds(duration: Protobuf.IDuration): number {
const nanos = duration.nanos || 0;
return (duration.seconds as Long).toNumber() * 1000 + nanos / 1e6;
const seconds =
typeof duration.seconds === 'number' ? duration.seconds : (duration.seconds as Long).toNumber();
return seconds * 1000 + nanos / 1e6;
}

/** Converts a (possibly fractional) value in milliseconds to a Protobuf Duration object */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ const NodeExecutionCacheStatusIcon: React.FC<
> = React.forwardRef(({ status, ...props }, ref) => {
switch (status) {
case CatalogCacheStatus.CACHE_DISABLED:
case CatalogCacheStatus.CACHE_MISS: {
case CatalogCacheStatus.CACHE_MISS:
case CatalogCacheStatus.CACHE_SKIPPED: {
return <InfoOutlined {...props} ref={ref} data-testid="cache-icon" />;
}
case CatalogCacheStatus.CACHE_HIT: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const ExecutionMetadataExtra: React.FC<{
rawOutputDataConfig,
securityContext,
interruptible,
overwriteCache,
} = execution.spec;

const [launchPlanSpec, setLaunchPlanSpec] = React.useState<Partial<LaunchPlanSpec>>({});
Expand Down Expand Up @@ -71,6 +72,10 @@ export const ExecutionMetadataExtra: React.FC<{
label: ExecutionMetadataLabels.interruptible,
value: interruptible ? (interruptible.value ? 'true' : 'false') : dashedValueString,
},
{
label: ExecutionMetadataLabels.overwriteCache,
value: overwriteCache ? 'true' : 'false',
},
];

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function useRelaunchWorkflowFormState({ execution }: RelaunchExecutionFormProps)
authRole,
securityContext,
interruptible,
overwriteCache,
},
} = execution;

Expand All @@ -61,6 +62,7 @@ function useRelaunchWorkflowFormState({ execution }: RelaunchExecutionFormProps)
authRole,
securityContext,
interruptible,
overwriteCache,
};
},
},
Expand All @@ -76,7 +78,7 @@ function useRelaunchTaskFormState({ execution }: RelaunchExecutionFormProps) {
defaultValue: {} as TaskInitialLaunchParameters,
doFetch: async (execution) => {
const {
spec: { authRole, launchPlan: taskId, interruptible },
spec: { authRole, launchPlan: taskId, interruptible, overwriteCache },
} = execution;
const task = await apiContext.getTask(taskId);
const inputDefinitions = getTaskInputs(task);
Expand All @@ -87,7 +89,7 @@ function useRelaunchTaskFormState({ execution }: RelaunchExecutionFormProps) {
},
apiContext,
);
return { authRole, values, taskId, interruptible };
return { authRole, values, taskId, interruptible, overwriteCache };
},
},
execution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export enum ExecutionMetadataLabels {
parallelism = 'Parallelism',
securityContextDefault = 'default',
interruptible = 'Interruptible override',
overwriteCache = 'Overwrite cached outputs',
}

export const tabs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const clusterTestId = `metadata-${ExecutionMetadataLabels.cluster}`;
const startTimeTestId = `metadata-${ExecutionMetadataLabels.time}`;
const durationTestId = `metadata-${ExecutionMetadataLabels.duration}`;
const interruptibleTestId = `metadata-${ExecutionMetadataLabels.interruptible}`;
const overwriteCacheTestId = `metadata-${ExecutionMetadataLabels.overwriteCache}`;

jest.mock('models/Launch/api', () => ({
getLaunchPlan: jest.fn(() => Promise.resolve({ spec: {} })),
Expand Down Expand Up @@ -95,4 +96,22 @@ describe('ExecutionMetadata', () => {
const { getByTestId } = renderMetadata();
expect(getByTestId(interruptibleTestId)).toHaveTextContent(dashedValueString);
});

it('shows true if cache was overwritten for execution', () => {
execution.spec.overwriteCache = true;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('true');
});

it('shows false if cache was not overwritten for execution', () => {
execution.spec.overwriteCache = false;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('false');
});

it('shows false if no cache overwrite value is found in execution spec', () => {
delete execution.spec.overwriteCache;
const { getByTestId } = renderMetadata();
expect(getByTestId(overwriteCacheTestId)).toHaveTextContent('false');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,39 @@ describe('RelaunchExecutionForm', () => {
}),
});
});

it('should not set cache overwrite value if not provided', async () => {
delete execution.spec.overwriteCache;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});

it('should have correct cache overwrite value if override is enabled', async () => {
execution.spec.overwriteCache = true;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: true,
}),
});
});

it('should have correct cache overwrite value if override is disabled', async () => {
execution.spec.overwriteCache = false;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});
});

describe('Launch form with full inputs', () => {
Expand Down Expand Up @@ -322,5 +355,38 @@ describe('RelaunchExecutionForm', () => {
}),
});
});

it('should not set cache overwrite value if not provided', async () => {
delete execution.spec.overwriteCache;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: undefined,
}),
});
});

it('should have correct cache overwrite value if override is enabled', async () => {
execution.spec.overwriteCache = true;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: true,
}),
});
});

it('should have correct cache overwrite value if override is disabled', async () => {
execution.spec.overwriteCache = false;
const { getByText } = renderForm();
await waitFor(() => expect(getByText(mockContentString)));
checkLaunchFormProps({
initialParameters: expect.objectContaining({
overwriteCache: false,
}),
});
});
});
});
2 changes: 1 addition & 1 deletion packages/zapp/console/src/components/Executions/strings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const str = {
cachePopulatedMessage: 'The result of this execution was written to cache.',
cachePutFailure: 'Failed to write output for this execution to cache.',
mapCacheMessage: "Check the detail panel for each task's cache status.",
cacheSkippedMessage: 'Cache skipped.',
cacheSkippedMessage: 'Cache was skipped for this execution.',
unknownCacheStatusString: 'Cache status is unknown',
viewSourceExecutionString: 'View source execution',
fromCache: 'From cache',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Typography } from '@material-ui/core';
import FormControlLabel from '@material-ui/core/FormControlLabel';
import Checkbox from '@material-ui/core/Checkbox';
import * as React from 'react';
import { useStyles } from './styles';
import { LaunchOverwriteCacheInputRef } from './types';
import t from './strings';

const isValueValid = (value: any) => {
return value !== undefined && value !== null;
};

interface LaunchOverwriteCacheInputProps {
initialValue?: boolean | null;
}

export const LaunchOverwriteCacheInputImpl: React.ForwardRefRenderFunction<
LaunchOverwriteCacheInputRef,
LaunchOverwriteCacheInputProps
> = (props, ref) => {
// overwriteCache stores the override to enable/disable the setting for an execution
const [overwriteCache, setOverwriteCache] = React.useState(false);

React.useEffect(() => {
if (isValueValid(props.initialValue)) {
setOverwriteCache(() => props.initialValue!);
}
}, [props.initialValue]);

const handleInputChange = React.useCallback(() => {
setOverwriteCache((prevState) => !prevState);
}, [overwriteCache]);

React.useImperativeHandle(
ref,
() => ({
getValue: () => {
return overwriteCache;
},
validate: () => true,
}),
[overwriteCache],
);

const styles = useStyles();

return (
<section>
<header className={styles.sectionHeader}>
<Typography variant="h6">Caching</Typography>
<Typography variant="body2">
Enabling the cache overwrite causes Flyte to ignore all previously computed and stored
outputs for a single execution and run all calculations again, overwriting any cached data
after a successful execution.
</Typography>
</header>
<section title={t('overwriteCache')}>
<FormControlLabel
control={<Checkbox checked={overwriteCache} onChange={handleInputChange} />}
label={t('overwriteCache')}
/>
</section>
</section>
);
};

export const LaunchOverwriteCacheInput = React.forwardRef(LaunchOverwriteCacheInputImpl);
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { LaunchFormInputs } from './LaunchFormInputs';
import { LaunchState } from './launchMachine';
import { LaunchRoleInput } from './LaunchRoleInput';
import { LaunchInterruptibleInput } from './LaunchInterruptibleInput';
import { LaunchOverwriteCacheInput } from './LaunchOverwriteCacheInput';
import { SearchableSelector } from './SearchableSelector';
import { useStyles } from './styles';
import { BaseInterpretedLaunchState, BaseLaunchService, LaunchTaskFormProps } from './types';
Expand All @@ -20,6 +21,7 @@ export const LaunchTaskForm: React.FC<LaunchTaskFormProps> = (props) => {
formInputsRef,
roleInputRef,
interruptibleInputRef,
overwriteCacheInputRef,
state,
service,
taskSourceSelectorState,
Expand Down Expand Up @@ -81,6 +83,10 @@ export const LaunchTaskForm: React.FC<LaunchTaskFormProps> = (props) => {
initialValue={state.context.interruptible}
ref={interruptibleInputRef}
/>
<LaunchOverwriteCacheInput
initialValue={state.context.overwriteCache}
ref={overwriteCacheInputRef}
/>
</DialogContent>
<LaunchFormActions
state={baseState}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { isEnterInputsState } from './utils';
import { LaunchRoleInput } from './LaunchRoleInput';
import { LaunchFormAdvancedInputs } from './LaunchFormAdvancedInputs';
import { LaunchInterruptibleInput } from './LaunchInterruptibleInput';
import { LaunchOverwriteCacheInput } from './LaunchOverwriteCacheInput';

/** Renders the form for initiating a Launch request based on a Workflow */
export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) => {
Expand All @@ -22,6 +23,7 @@ export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) =>
roleInputRef,
advancedOptionsRef,
interruptibleInputRef,
overwriteCacheInputRef,
state,
service,
workflowSourceSelectorState,
Expand Down Expand Up @@ -124,6 +126,10 @@ export const LaunchWorkflowForm: React.FC<LaunchWorkflowFormProps> = (props) =>
initialValue={state.context.interruptible}
ref={interruptibleInputRef}
/>
<LaunchOverwriteCacheInput
initialValue={state.context.overwriteCache}
ref={overwriteCacheInputRef}
/>
</AccordionDetails>
</Accordion>
</DialogContent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export interface WorkflowLaunchContext extends BaseLaunchContext {
annotations?: Admin.IAnnotations | null;
securityContext?: Core.ISecurityContext | null;
interruptible?: Protobuf.IBoolValue | null;
overwriteCache?: boolean | null;
}

export interface TaskLaunchContext extends BaseLaunchContext {
Expand All @@ -92,6 +93,7 @@ export interface TaskLaunchContext extends BaseLaunchContext {
taskVersion?: Identifier;
taskVersionOptions?: Task[];
interruptible?: Protobuf.IBoolValue | null;
overwriteCache?: boolean | null;
}

export interface TaskResumeContext extends BaseLaunchContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const str = {
launchPlan: 'Launch Plan',
interruptible: 'Interruptible',
viewNodeInputs: 'View node inputs',
overwriteCache: 'Overwrite cached outputs',
};

export { patternKey } from '@flyteconsole/locale';
Expand Down
Loading