From 363f36949ad1ec4b85bc030a8486835ae76f61f9 Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 2 Jan 2025 15:21:14 +0000 Subject: [PATCH 1/3] Make root stream selectively immutable --- .../src/helpers/type_guards.ts | 12 +- .../server/lib/streams/errors/index.ts | 1 + .../root_stream_immutability_exception.ts | 13 ++ .../streams/server/routes/streams/edit.ts | 42 +++- .../fields_table.tsx | 204 +++++++++--------- .../apis/streams/helpers/requests.ts | 9 +- .../api_integration/apis/streams/index.ts | 1 + .../apis/streams/root_stream.ts | 116 ++++++++++ 8 files changed, 293 insertions(+), 105 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts create mode 100644 x-pack/test/api_integration/apis/streams/root_stream.ts diff --git a/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts b/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts index 557513fa74bb2..2f7d59f420bc1 100644 --- a/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts +++ b/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { ZodSchema } from '@kbn/zod'; +import { ZodSchema, custom } from '@kbn/zod'; import { AndCondition, conditionSchema, @@ -75,6 +75,16 @@ export function isWiredStream( return isSchema(wiredStreamDefinitonSchema, subject); } +const rootStreamSchema = custom<'RootStreamSchema'>((val) => { + return val?.name?.split('.').length === 1; +}); + +export function isRootStream(subject: any) { + return ( + (isWiredStream(subject) || isWiredReadStream(subject)) && isSchema(rootStreamSchema, subject) + ); +} + export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition { return isSchema(wiredStreamConfigDefinitonSchema, subject); } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts index 73842ef3018fe..605465d7bcac7 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts @@ -13,3 +13,4 @@ export * from './security_exception'; export * from './index_template_not_found'; export * from './fork_condition_missing'; export * from './component_template_not_found'; +export * from './root_stream_immutability_exception'; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts new file mode 100644 index 0000000000000..4b1573f0ff01b --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class RootStreamImmutabilityException extends Error { + constructor(message: string) { + super(message); + this.name = 'RootStreamImmutabilityException'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts index cf88835602076..70ef8783a4ee6 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts @@ -10,6 +10,7 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import { badRequest, internal, notFound } from '@hapi/boom'; import { + isRootStream, isWiredStream, isWiredStreamConfig, streamConfigDefinitionSchema, @@ -17,10 +18,12 @@ import { WiredStreamConfigDefinition, WiredStreamDefinition, } from '@kbn/streams-schema'; +import fastDeepEqual from 'fast-deep-equal'; import { DefinitionNotFound, ForkConditionMissing, IndexTemplateNotFound, + RootStreamImmutabilityException, SecurityException, } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; @@ -69,6 +72,10 @@ export const editStreamRoute = createServerRoute({ return { acknowledged: true }; } + if (isRootStream(streamDefinition)) { + await validateRootStreamChanges(scopedClusterClient, streamDefinition); + } + await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing); if (isWiredStreamConfig(params.body)) { await validateAncestorFields( @@ -143,7 +150,8 @@ export const editStreamRoute = createServerRoute({ if ( e instanceof SecurityException || e instanceof ForkConditionMissing || - e instanceof MalformedStreamId + e instanceof MalformedStreamId || + e instanceof RootStreamImmutabilityException ) { throw badRequest(e); } @@ -207,3 +215,35 @@ async function validateStreamChildren( } } } + +/* + * Changes to mappings (fields) and processing rules are not allowed on the root stream. + * Changes to routing rules are allowed. + */ +async function validateRootStreamChanges( + scopedClusterClient: IScopedClusterClient, + nextStreamDefinition: WiredStreamDefinition +) { + const oldDefinition = (await readStream({ + scopedClusterClient, + id: nextStreamDefinition.name, + })) as WiredStreamDefinition; + + const hasFieldChanges = !fastDeepEqual( + oldDefinition.stream.ingest.wired.fields, + nextStreamDefinition.stream.ingest.wired.fields + ); + + if (hasFieldChanges) { + throw new RootStreamImmutabilityException('Root stream fields cannot be changed'); + } + + const hasProcessingChanges = !fastDeepEqual( + oldDefinition.stream.ingest.processing, + nextStreamDefinition.stream.ingest.processing + ); + + if (hasProcessingChanges) { + throw new RootStreamImmutabilityException('Root stream processing rules cannot be changed'); + } +} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx index 1b6d0131a6bc2..1f068d9b0ac16 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx @@ -22,7 +22,7 @@ import type { } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import useToggle from 'react-use/lib/useToggle'; -import { isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema'; +import { isRootStream, isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema'; import { FieldType } from './field_type'; import { FieldStatus } from './field_status'; import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state'; @@ -155,111 +155,113 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie const [visibleColumns, setVisibleColumns] = useState(Object.keys(COLUMNS)); const trailingColumns = useMemo(() => { - return [ - { - id: 'actions', - width: 40, - headerCellRender: () => null, - rowCellRender: ({ rowIndex }) => { - const field = fields[rowIndex]; + return !isRootStream(definition) + ? ([ + { + id: 'actions', + width: 40, + headerCellRender: () => null, + rowCellRender: ({ rowIndex }) => { + const field = fields[rowIndex]; - let actions: ActionsCellActionsDescriptor[] = []; + let actions: ActionsCellActionsDescriptor[] = []; - switch (field.status) { - case 'mapped': - actions = [ - { - name: i18n.translate('xpack.streams.actions.viewFieldLabel', { - defaultMessage: 'View field', - }), - disabled: editingState.isSaving, - onClick: (fieldEntry: FieldEntry) => { - editingState.selectField(fieldEntry, false); - }, - }, - { - name: i18n.translate('xpack.streams.actions.editFieldLabel', { - defaultMessage: 'Edit field', - }), - disabled: editingState.isSaving, - onClick: (fieldEntry: FieldEntry) => { - editingState.selectField(fieldEntry, true); - }, - }, - { - name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', { - defaultMessage: 'Unmap field', - }), - disabled: unpromotingState.isUnpromotingField, - onClick: (fieldEntry: FieldEntry) => { - unpromotingState.setSelectedField(fieldEntry.name); - }, - }, - ]; - break; - case 'unmapped': - actions = [ - { - name: i18n.translate('xpack.streams.actions.viewFieldLabel', { - defaultMessage: 'View field', - }), - disabled: editingState.isSaving, - onClick: (fieldEntry: FieldEntry) => { - editingState.selectField(fieldEntry, false); - }, - }, - { - name: i18n.translate('xpack.streams.actions.mapFieldLabel', { - defaultMessage: 'Map field', - }), - disabled: editingState.isSaving, - onClick: (fieldEntry: FieldEntry) => { - editingState.selectField(fieldEntry, true); - }, - }, - ]; - break; - case 'inherited': - actions = [ - { - name: i18n.translate('xpack.streams.actions.viewFieldLabel', { - defaultMessage: 'View field', - }), - disabled: editingState.isSaving, - onClick: (fieldEntry: FieldEntry) => { - editingState.selectField(fieldEntry, false); - }, - }, - ]; - break; - } + switch (field.status) { + case 'mapped': + actions = [ + { + name: i18n.translate('xpack.streams.actions.viewFieldLabel', { + defaultMessage: 'View field', + }), + disabled: editingState.isSaving, + onClick: (fieldEntry: FieldEntry) => { + editingState.selectField(fieldEntry, false); + }, + }, + { + name: i18n.translate('xpack.streams.actions.editFieldLabel', { + defaultMessage: 'Edit field', + }), + disabled: editingState.isSaving, + onClick: (fieldEntry: FieldEntry) => { + editingState.selectField(fieldEntry, true); + }, + }, + { + name: i18n.translate('xpack.streams.actions.unpromoteFieldLabel', { + defaultMessage: 'Unmap field', + }), + disabled: unpromotingState.isUnpromotingField, + onClick: (fieldEntry: FieldEntry) => { + unpromotingState.setSelectedField(fieldEntry.name); + }, + }, + ]; + break; + case 'unmapped': + actions = [ + { + name: i18n.translate('xpack.streams.actions.viewFieldLabel', { + defaultMessage: 'View field', + }), + disabled: editingState.isSaving, + onClick: (fieldEntry: FieldEntry) => { + editingState.selectField(fieldEntry, false); + }, + }, + { + name: i18n.translate('xpack.streams.actions.mapFieldLabel', { + defaultMessage: 'Map field', + }), + disabled: editingState.isSaving, + onClick: (fieldEntry: FieldEntry) => { + editingState.selectField(fieldEntry, true); + }, + }, + ]; + break; + case 'inherited': + actions = [ + { + name: i18n.translate('xpack.streams.actions.viewFieldLabel', { + defaultMessage: 'View field', + }), + disabled: editingState.isSaving, + onClick: (fieldEntry: FieldEntry) => { + editingState.selectField(fieldEntry, false); + }, + }, + ]; + break; + } - return ( - ({ - name: action.name, - icon: action.icon, - onClick: (event) => { - action.onClick(field); + id: 0, + title: i18n.translate( + 'xpack.streams.streamDetailSchemaEditorFieldsTableActionsTitle', + { + defaultMessage: 'Actions', + } + ), + items: actions.map((action) => ({ + name: action.name, + icon: action.icon, + onClick: (event) => { + action.onClick(field); + }, + })), }, - })), - }, - ]} - /> - ); - }, - }, - ] as EuiDataGridProps['trailingControlColumns']; - }, [editingState, fields, unpromotingState]); + ]} + /> + ); + }, + }, + ] as EuiDataGridProps['trailingControlColumns']) + : undefined; + }, [definition, editingState, fields, unpromotingState]); return ( { + before(async () => { + await enableStreams(supertest); + }); + + after(async () => { + await cleanUpRootStream(esClient); + }); + + it('Should not allow processing changes', async () => { + const body: WiredStreamConfigDefinition = { + ingest: { + ...rootStreamDefinition.stream.ingest, + processing: [ + { + config: { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + }, + ], + }, + }; + const response = await putStream(supertest, 'logs', body, 400); + expect(response).to.have.property( + 'message', + 'Root stream processing rules cannot be changed' + ); + }); + + it('Should not allow fields changes', async () => { + const body: WiredStreamConfigDefinition = { + ingest: { + ...rootStreamDefinition.stream.ingest, + wired: { + fields: { + ...rootStreamDefinition.stream.ingest.wired.fields, + 'log.level': { + type: 'boolean', + }, + }, + }, + }, + }; + const response = await putStream(supertest, 'logs', body, 400); + expect(response).to.have.property('message', 'Root stream fields cannot be changed'); + }); + + it('Should allow routing changes', async () => { + const body: WiredStreamConfigDefinition = { + ingest: { + ...rootStreamDefinition.stream.ingest, + routing: [ + { + name: 'logs.gcpcloud', + condition: { + field: 'cloud.provider', + operator: 'eq', + value: 'gcp', + }, + }, + ], + }, + }; + const response = await putStream(supertest, 'logs', body); + expect(response).to.have.property('acknowledged', true); + }); + }); +} From 9eafd43ac5ec606bb4771f56c7a8772fca7cf62d Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Mon, 6 Jan 2025 14:04:23 +0000 Subject: [PATCH 2/3] Fix enrichment tests --- .../apis/streams/enrichment.ts | 59 ++++++++++++++++--- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/x-pack/test/api_integration/apis/streams/enrichment.ts b/x-pack/test/api_integration/apis/streams/enrichment.ts index e9fb604438ee6..95665be7a2a89 100644 --- a/x-pack/test/api_integration/apis/streams/enrichment.ts +++ b/x-pack/test/api_integration/apis/streams/enrichment.ts @@ -8,7 +8,14 @@ import expect from '@kbn/expect'; import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; import { WiredStreamConfigDefinition } from '@kbn/streams-schema'; -import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests'; +import { + deleteStream, + enableStreams, + fetchDocument, + forkStream, + indexDocument, + putStream, +} from './helpers/requests'; import { FtrProviderContext } from '../../ftr_provider_context'; import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; import { cleanUpRootStream } from './helpers/cleanup'; @@ -21,11 +28,24 @@ export default function ({ getService }: FtrProviderContext) { describe('Enrichment', () => { after(async () => { + await deleteStream(supertest, 'logs.nginx'); await cleanUpRootStream(esClient); }); before(async () => { await enableStreams(supertest); + const body = { + stream: { + name: 'logs.nginx', + }, + condition: { + field: 'host.name', + operator: 'eq', + value: 'routeme', + }, + }; + // We use a forked stream as processing changes cannot be made to the root stream + await forkStream(supertest, 'logs', body); }); it('Place processing steps', async () => { @@ -78,7 +98,7 @@ export default function ({ getService }: FtrProviderContext) { }, }, }; - const response = await putStream(supertest, 'logs', body); + const response = await putStream(supertest, 'logs.nginx', body); expect(response).to.have.property('acknowledged', true); }); @@ -86,15 +106,28 @@ export default function ({ getService }: FtrProviderContext) { const doc = { '@timestamp': '2024-01-01T00:00:10.000Z', message: '2023-01-01T00:00:10.000Z error test', + ['host.name']: 'routeme', }; const response = await indexDocument(esClient, 'logs', doc); expect(response.result).to.eql('created'); - await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger }); + const reroutedDocResponse = await waitForDocumentInIndex({ + esClient, + indexName: 'logs.nginx', + retryService, + logger, + }); - const result = await fetchDocument(esClient, 'logs', response._id); + const result = await fetchDocument( + esClient, + 'logs.nginx', + reroutedDocResponse.hits?.hits[0]?._id! + ); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:10.000Z', message: '2023-01-01T00:00:10.000Z error test', + host: { + name: 'routeme', + }, inner_timestamp: '2023-01-01T00:00:10.000Z', message2: 'test', log: { @@ -107,22 +140,30 @@ export default function ({ getService }: FtrProviderContext) { const doc = { '@timestamp': '2024-01-01T00:00:11.000Z', message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + ['host.name']: 'routeme', }; const response = await indexDocument(esClient, 'logs', doc); expect(response.result).to.eql('created'); - await waitForDocumentInIndex({ + const reroutedDocResponse = await waitForDocumentInIndex({ esClient, - indexName: 'logs', + indexName: 'logs.nginx', retryService, logger, docCountTarget: 2, }); - const result = await fetchDocument(esClient, 'logs', response._id); + const result = await fetchDocument( + esClient, + 'logs.nginx', + reroutedDocResponse.hits?.hits[0]?._id! + ); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:11.000Z', message: '2023-01-01T00:00:10.000Z info mylogger this is the message', inner_timestamp: '2023-01-01T00:00:10.000Z', + host: { + name: 'routeme', + }, log: { level: 'info', logger: 'mylogger', @@ -134,7 +175,7 @@ export default function ({ getService }: FtrProviderContext) { it('Doc is searchable', async () => { const response = await esClient.search({ - index: 'logs', + index: 'logs.nginx', body: { query: { match: { @@ -148,7 +189,7 @@ export default function ({ getService }: FtrProviderContext) { it('Non-indexed field is not searchable', async () => { const response = await esClient.search({ - index: 'logs', + index: 'logs.nginx', body: { query: { match: { From 8b234deb527ceb74c97971ead07be260634d74fd Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Tue, 7 Jan 2025 12:18:11 +0000 Subject: [PATCH 3/3] Switch to lodash and fetch definition in route handler --- .../streams/server/routes/streams/edit.ts | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts index 70ef8783a4ee6..2faec4e75a227 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts @@ -18,7 +18,7 @@ import { WiredStreamConfigDefinition, WiredStreamDefinition, } from '@kbn/streams-schema'; -import fastDeepEqual from 'fast-deep-equal'; +import { isEqual } from 'lodash'; import { DefinitionNotFound, ForkConditionMissing, @@ -72,11 +72,25 @@ export const editStreamRoute = createServerRoute({ return { acknowledged: true }; } + const currentStreamDefinition = (await readStream({ + scopedClusterClient, + id: params.path.id, + })) as WiredStreamDefinition; + if (isRootStream(streamDefinition)) { - await validateRootStreamChanges(scopedClusterClient, streamDefinition); + await validateRootStreamChanges( + scopedClusterClient, + currentStreamDefinition, + streamDefinition + ); } - await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing); + await validateStreamChildren( + scopedClusterClient, + currentStreamDefinition, + params.body.ingest.routing + ); + if (isWiredStreamConfig(params.body)) { await validateAncestorFields( scopedClusterClient, @@ -190,15 +204,11 @@ async function updateParentStream( async function validateStreamChildren( scopedClusterClient: IScopedClusterClient, - id: string, + currentStreamDefinition: WiredStreamDefinition, children: WiredStreamConfigDefinition['ingest']['routing'] ) { try { - const oldDefinition = await readStream({ - scopedClusterClient, - id, - }); - const oldChildren = oldDefinition.stream.ingest.routing.map((child) => child.name); + const oldChildren = currentStreamDefinition.stream.ingest.routing.map((child) => child.name); const newChildren = new Set(children.map((child) => child.name)); children.forEach((child) => { validateCondition(child.condition); @@ -222,15 +232,11 @@ async function validateStreamChildren( */ async function validateRootStreamChanges( scopedClusterClient: IScopedClusterClient, + currentStreamDefinition: WiredStreamDefinition, nextStreamDefinition: WiredStreamDefinition ) { - const oldDefinition = (await readStream({ - scopedClusterClient, - id: nextStreamDefinition.name, - })) as WiredStreamDefinition; - - const hasFieldChanges = !fastDeepEqual( - oldDefinition.stream.ingest.wired.fields, + const hasFieldChanges = !isEqual( + currentStreamDefinition.stream.ingest.wired.fields, nextStreamDefinition.stream.ingest.wired.fields ); @@ -238,8 +244,8 @@ async function validateRootStreamChanges( throw new RootStreamImmutabilityException('Root stream fields cannot be changed'); } - const hasProcessingChanges = !fastDeepEqual( - oldDefinition.stream.ingest.processing, + const hasProcessingChanges = !isEqual( + currentStreamDefinition.stream.ingest.processing, nextStreamDefinition.stream.ingest.processing );