Skip to content

Commit

Permalink
[Streams] Make root stream selectively immutable (elastic#205609)
Browse files Browse the repository at this point in the history
## Summary

This closes elastic/streams-program#54.

The root stream is selectively immutable (processing and fields changes
are not allowed).

## UI

For the UI I've entirely disabled the actions column for the root stream
in the schema editor. All of the information (bar the preview table for
changes) available in the flyout for a field is already available in the
table, so this seems easiest for now to avoid multiple logic forks
wrapping buttons etc.

E.g. flyout vs table

![Screenshot 2025-01-02 at 13 41
55](https://github.com/user-attachments/assets/867fd67c-4acc-4457-ad5f-0eb5e9d9ce3f)
  • Loading branch information
Kerry350 authored and crespocarlos committed Jan 8, 2025
1 parent ff00c38 commit e2e480d
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 121 deletions.
12 changes: 11 additions & 1 deletion x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { ZodSchema } from '@kbn/zod';
import { ZodSchema, custom } from '@kbn/zod';
import {
AndCondition,
conditionSchema,
Expand Down Expand Up @@ -71,6 +71,16 @@ export function isWiredStream(subject: StreamDefinition): subject is WiredStream
return isSchema(wiredStreamDefinitonSchema, subject);
}

const rootStreamSchema = custom<'RootStreamSchema'>((val) => {
return val?.name?.split('.').length === 1;
});

export function isRootStream(subject: any) {
return (
(isWiredStream(subject) || isWiredReadStream(subject)) && isSchema(rootStreamSchema, subject)
);
}

export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition {
return isSchema(wiredStreamConfigDefinitonSchema, subject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export * from './security_exception';
export * from './index_template_not_found';
export * from './fork_condition_missing';
export * from './component_template_not_found';
export * from './root_stream_immutability_exception';
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class RootStreamImmutabilityException extends Error {
constructor(message: string) {
super(message);
this.name = 'RootStreamImmutabilityException';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { badRequest, internal, notFound } from '@hapi/boom';
import {
isRootStream,
isWiredStream,
isWiredStreamConfig,
streamConfigDefinitionSchema,
StreamDefinition,
WiredStreamConfigDefinition,
WiredStreamDefinition,
} from '@kbn/streams-schema';
import { isEqual } from 'lodash';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
Expand Down Expand Up @@ -71,7 +74,25 @@ export const editStreamRoute = createServerRoute({
return { acknowledged: true };
}

await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing);
const currentStreamDefinition = (await readStream({
scopedClusterClient,
id: params.path.id,
})) as WiredStreamDefinition;

if (isRootStream(streamDefinition)) {
await validateRootStreamChanges(
scopedClusterClient,
currentStreamDefinition,
streamDefinition
);
}

await validateStreamChildren(
scopedClusterClient,
currentStreamDefinition,
params.body.ingest.routing
);

if (isWiredStreamConfig(params.body)) {
await validateAncestorFields(
scopedClusterClient,
Expand Down Expand Up @@ -148,7 +169,8 @@ export const editStreamRoute = createServerRoute({
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
Expand Down Expand Up @@ -189,15 +211,11 @@ async function updateParentStream(

async function validateStreamChildren(
scopedClusterClient: IScopedClusterClient,
id: string,
currentStreamDefinition: WiredStreamDefinition,
children: WiredStreamConfigDefinition['ingest']['routing']
) {
try {
const oldDefinition = await readStream({
scopedClusterClient,
id,
});
const oldChildren = oldDefinition.stream.ingest.routing.map((child) => child.name);
const oldChildren = currentStreamDefinition.stream.ingest.routing.map((child) => child.name);
const newChildren = new Set(children.map((child) => child.name));
children.forEach((child) => {
validateCondition(child.condition);
Expand All @@ -214,3 +232,31 @@ async function validateStreamChildren(
}
}
}

/*
* Changes to mappings (fields) and processing rules are not allowed on the root stream.
* Changes to routing rules are allowed.
*/
async function validateRootStreamChanges(
scopedClusterClient: IScopedClusterClient,
currentStreamDefinition: WiredStreamDefinition,
nextStreamDefinition: WiredStreamDefinition
) {
const hasFieldChanges = !isEqual(
currentStreamDefinition.stream.ingest.wired.fields,
nextStreamDefinition.stream.ingest.wired.fields
);

if (hasFieldChanges) {
throw new RootStreamImmutabilityException('Root stream fields cannot be changed');
}

const hasProcessingChanges = !isEqual(
currentStreamDefinition.stream.ingest.processing,
nextStreamDefinition.stream.ingest.processing
);

if (hasProcessingChanges) {
throw new RootStreamImmutabilityException('Root stream processing rules cannot be changed');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type {
} from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import useToggle from 'react-use/lib/useToggle';
import { isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { isRootStream, isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema';
import { FieldType } from './field_type';
import { FieldStatus } from './field_status';
import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state';
Expand Down Expand Up @@ -155,111 +155,113 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie
const [visibleColumns, setVisibleColumns] = useState(Object.keys(COLUMNS));

const trailingColumns = useMemo(() => {
return [
{
id: 'actions',
width: 40,
headerCellRender: () => null,
rowCellRender: ({ rowIndex }) => {
const field = fields[rowIndex];
return !isRootStream(definition)
? ([
{
id: 'actions',
width: 40,
headerCellRender: () => null,
rowCellRender: ({ rowIndex }) => {
const field = fields[rowIndex];

let actions: ActionsCellActionsDescriptor[] = [];
let actions: ActionsCellActionsDescriptor[] = [];

switch (field.status) {
case 'mapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.editFieldLabel', {
defaultMessage: 'Edit field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
{
name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', {
defaultMessage: 'Unmap field',
}),
disabled: unpromotingState.isUnpromotingField,
onClick: (fieldEntry: FieldEntry) => {
unpromotingState.setSelectedField(fieldEntry.name);
},
},
];
break;
case 'unmapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.mapFieldLabel', {
defaultMessage: 'Map field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
];
break;
case 'inherited':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
];
break;
}
switch (field.status) {
case 'mapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.editFieldLabel', {
defaultMessage: 'Edit field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
{
name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', {
defaultMessage: 'Unmap field',
}),
disabled: unpromotingState.isUnpromotingField,
onClick: (fieldEntry: FieldEntry) => {
unpromotingState.setSelectedField(fieldEntry.name);
},
},
];
break;
case 'unmapped':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
{
name: i18n.translate('xpack.streams.actions.mapFieldLabel', {
defaultMessage: 'Map field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, true);
},
},
];
break;
case 'inherited':
actions = [
{
name: i18n.translate('xpack.streams.actions.viewFieldLabel', {
defaultMessage: 'View field',
}),
disabled: editingState.isSaving,
onClick: (fieldEntry: FieldEntry) => {
editingState.selectField(fieldEntry, false);
},
},
];
break;
}

return (
<ActionsCell
panels={[
{
id: 0,
title: i18n.translate(
'xpack.streams.streamDetailSchemaEditorFieldsTableActionsTitle',
return (
<ActionsCell
panels={[
{
defaultMessage: 'Actions',
}
),
items: actions.map((action) => ({
name: action.name,
icon: action.icon,
onClick: (event) => {
action.onClick(field);
id: 0,
title: i18n.translate(
'xpack.streams.streamDetailSchemaEditorFieldsTableActionsTitle',
{
defaultMessage: 'Actions',
}
),
items: actions.map((action) => ({
name: action.name,
icon: action.icon,
onClick: (event) => {
action.onClick(field);
},
})),
},
})),
},
]}
/>
);
},
},
] as EuiDataGridProps['trailingControlColumns'];
}, [editingState, fields, unpromotingState]);
]}
/>
);
},
},
] as EuiDataGridProps['trailingControlColumns'])
: undefined;
}, [definition, editingState, fields, unpromotingState]);

return (
<EuiDataGrid
Expand Down
Loading

0 comments on commit e2e480d

Please sign in to comment.