diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc
index 54283d6cce79..c6f206932296 100644
--- a/filebeat/docs/inputs/input-filestream.asciidoc
+++ b/filebeat/docs/inputs/input-filestream.asciidoc
@@ -165,19 +165,20 @@ include::../inputs/input-filestream-reader-options.asciidoc[]
 
 This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
 These metrics are exposed under the `/inputs` path. They can be used to
-observe the activity of the input.
+observe the activity of the input. Note that metrics from processors are not included.
 
 [options="header"]
 |=======
-| Metric                    | Description
-| `files_opened_total`      | Total number of files opened.
-| `files_closed_total`      | Total number of files closed.
-| `files_active`            | Number of files currently open (gauge).
-| `messages_read_total`     | Total number of messages read.
-| `bytes_processed_total`   | Total number of bytes processed.
-| `events_processed_total`  | Total number of events processed.
-| `processing_errors_total` | Total number of processing errors.
-| `processing_time`         | Histogram of the elapsed time to process messages (expressed in nanoseconds).
+| Metric                     | Description
+| `files_opened_total`        | Total number of files opened.
+| `files_closed_total`        | Total number of files closed.
+| `files_active`              | Number of files currently open (gauge).
+| `messages_read_total`      | Total number of messages read.
+| `messages_truncated_total` | Total number of messages truncated.
+| `bytes_processed_total`    | Total number of bytes processed.
+| `events_processed_total`   | Total number of events processed.
+| `processing_errors_total`  | Total number of processing errors.
+| `processing_time`          | Histogram of the elapsed time to process messages (expressed in nanoseconds).
 |=======
 
 Note:
diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go
index f9804bb16f32..66696087d9ff 100644
--- a/filebeat/input/filestream/environment_test.go
+++ b/filebeat/input/filestream/environment_test.go
@@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
 func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
 	entry, err := e.getRegistryState(key)
 	if err != nil {
-		e.t.Fatalf(err.Error())
+		e.t.Fatal(err.Error())
 	}
 
 	require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
@@ -415,12 +415,18 @@ func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, co
 	select {
 	case <-ctx.Done():
 		logLines := map[string][]string{}
-		for _, e := range e.pipeline.GetAllEvents() {
-			flat := e.Fields.Flatten()
+		for _, evt := range e.pipeline.GetAllEvents() {
+			flat := evt.Fields.Flatten()
 			pathi, _ := flat.GetValue("log.file.path")
-			path := pathi.(string)
+			path, ok := pathi.(string)
+			if !ok {
+				e.t.Fatalf("waitUntilEventCountCtx: path is not a string: %v", pathi)
+			}
 			msgi, _ := flat.GetValue("message")
-			msg := msgi.(string)
+			msg, ok := msgi.(string)
+			if !ok {
+				e.t.Fatalf("waitUntilEventCountCtx: message is not a string: %v", msgi)
+			}
 			logLines[path] = append(logLines[path], msg)
 		}
 
@@ -462,7 +468,10 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
 			if len(events) == checkedEventCount {
 				e.t.Fatalf("not enough expected elements")
 			}
-			message := evt.Fields["message"].(string)
+			message, ok := evt.Fields["message"].(string)
+			if !ok {
+				e.t.Fatalf("message is not string %+v", evt.Fields["message"])
+			}
 			if message == events[checkedEventCount] {
 				foundEvents[checkedEventCount] = true
 			}
diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go
index c2efe2c50cdd..0e2a66a23075 100644
--- a/filebeat/input/filestream/input.go
+++ b/filebeat/input/filestream/input.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"slices"
 	"time"
 
 	"golang.org/x/text/transform"
@@ -380,6 +381,15 @@ func (inp *filestream) readFromSource(
 
 		s.Offset += int64(message.Bytes) + int64(message.Offset)
 
+		flags, err := message.Fields.GetValue("log.flags")
+		if err == nil {
+			if flags, ok := flags.([]string); ok {
+				if slices.Contains(flags, "truncated") { //nolint:typecheck,nolintlint // linter fails to infer generics
+					metrics.MessagesTruncated.Add(1)
+				}
+			}
+		}
+
 		metrics.MessagesRead.Inc()
 		if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
 			continue
diff --git a/filebeat/input/filestream/internal/input-logfile/metrics.go b/filebeat/input/filestream/internal/input-logfile/metrics.go
index 194e72f614d1..edc6e9159346 100644
--- a/filebeat/input/filestream/internal/input-logfile/metrics.go
+++ b/filebeat/input/filestream/internal/input-logfile/metrics.go
@@ -29,14 +29,15 @@ import (
 type Metrics struct {
 	unregister func()
 
-	FilesOpened      *monitoring.Uint // Number of files that have been opened.
-	FilesClosed      *monitoring.Uint // Number of files closed.
-	FilesActive      *monitoring.Uint // Number of files currently open (gauge).
-	MessagesRead     *monitoring.Uint // Number of messages read.
-	BytesProcessed   *monitoring.Uint // Number of bytes processed.
-	EventsProcessed  *monitoring.Uint // Number of events processed.
-	ProcessingErrors *monitoring.Uint // Number of processing errors.
-	ProcessingTime   metrics.Sample   // Histogram of the elapsed time for processing an event.
+	FilesOpened       *monitoring.Uint // Number of files that have been opened.
+	FilesClosed       *monitoring.Uint // Number of files closed.
+	FilesActive       *monitoring.Uint // Number of files currently open (gauge).
+	MessagesRead      *monitoring.Uint // Number of messages read.
+	MessagesTruncated *monitoring.Uint // Number of messages truncated.
+	BytesProcessed    *monitoring.Uint // Number of bytes processed.
+	EventsProcessed   *monitoring.Uint // Number of events processed.
+	ProcessingErrors  *monitoring.Uint // Number of processing errors.
+	ProcessingTime    metrics.Sample   // Histogram of the elapsed time for processing an event.
 
 	// Those metrics use the same registry/keys as the log input uses
 	HarvesterStarted   *monitoring.Int
@@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics {
 
 	reg, unreg := inputmon.NewInputRegistry("filestream", id, nil)
 	m := Metrics{
-		unregister:       unreg,
-		FilesOpened:      monitoring.NewUint(reg, "files_opened_total"),
-		FilesClosed:      monitoring.NewUint(reg, "files_closed_total"),
-		FilesActive:      monitoring.NewUint(reg, "files_active"),
-		MessagesRead:     monitoring.NewUint(reg, "messages_read_total"),
-		BytesProcessed:   monitoring.NewUint(reg, "bytes_processed_total"),
-		EventsProcessed:  monitoring.NewUint(reg, "events_processed_total"),
-		ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
-		ProcessingTime:   metrics.NewUniformSample(1024),
+		unregister:        unreg,
+		FilesOpened:       monitoring.NewUint(reg, "files_opened_total"),
+		FilesClosed:       monitoring.NewUint(reg, "files_closed_total"),
+		FilesActive:       monitoring.NewUint(reg, "files_active"),
+		MessagesRead:      monitoring.NewUint(reg, "messages_read_total"),
+		MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"),
+		BytesProcessed:    monitoring.NewUint(reg, "bytes_processed_total"),
+		EventsProcessed:   monitoring.NewUint(reg, "events_processed_total"),
+		ProcessingErrors:  monitoring.NewUint(reg, "processing_errors_total"),
+		ProcessingTime:    metrics.NewUniformSample(1024),
 
 		HarvesterStarted:   monitoring.NewInt(harvesterMetrics, "started"),
 		HarvesterClosed:    monitoring.NewInt(harvesterMetrics, "closed"),
diff --git a/filebeat/input/filestream/metrics_integration_test.go b/filebeat/input/filestream/metrics_integration_test.go
index 3671f076d0ed..e34f71bc53e3 100644
--- a/filebeat/input/filestream/metrics_integration_test.go
+++ b/filebeat/input/filestream/metrics_integration_test.go
@@ -33,14 +33,37 @@ func TestFilestreamMetrics(t *testing.T) {
 
 	testlogName := "test.log"
 	inp := env.mustCreateInput(map[string]interface{}{
+<<<<<<< HEAD
 		"id":                                   "fake-ID",
 		"paths":                                []string{env.abspath(testlogName)},
 		"prospector.scanner.check_interval":    "24h",
 		"close.on_state_change.check_interval": "100ms",
 		"close.on_state_change.inactive":       "2s",
+=======
+		"id":                                     "fake-ID",
+		"paths":                                  []string{env.abspath(testlogName)},
+		"prospector.scanner.check_interval":      "24h",
+		"close.on_state_change.check_interval":   "100ms",
+		"close.on_state_change.inactive":         "2s",
+		"prospector.scanner.fingerprint.enabled": false,
+		"file_identity.native":                   map[string]any{},
+		"message_max_bytes":                      20,
+		"parsers": []map[string]interface{}{
+			{
+				"multiline": map[string]interface{}{
+					"type":      "pattern",
+					"pattern":   "^multiline",
+					"negate":    true,
+					"match":     "after",
+					"max_lines": 1,
+					"timeout":   "1s",
+				},
+			},
+		},
+>>>>>>> 7806f1a2c (filebeat/inputs/filestream: add metric for messages truncated (#41667))
 	})
 
-	testlines := []byte("first line\nsecond line\nthird line\n")
+	testlines := []byte("first line\nsecond line\nthird line\nthis is a very long line exceeding message_max_bytes\nmultiline first line\nmultiline second line\n")
 	env.mustWriteToFile(testlogName, testlines)
 
 	ctx, cancelInput := context.WithCancel(context.Background())
@@ -51,13 +74,105 @@ func TestFilestreamMetrics(t *testing.T) {
 	env.waitUntilHarvesterIsDone()
 
 	checkMetrics(t, "fake-ID", expectedMetrics{
-		FilesOpened:      1,
-		FilesClosed:      1,
-		FilesActive:      0,
-		MessagesRead:     3,
-		BytesProcessed:   34,
-		EventsProcessed:  3,
-		ProcessingErrors: 0,
+		FilesOpened:       1,
+		FilesClosed:       1,
+		FilesActive:       0,
+		MessagesRead:      3,
+		MessagesTruncated: 2,
+		BytesProcessed:    130,
+		EventsProcessed:   3,
+		ProcessingErrors:  0,
+	})
+
+	cancelInput()
+	env.waitUntilInputStops()
+}
+
+func TestFilestreamMessageMaxBytesTruncatedMetric(t *testing.T) {
+	env := newInputTestingEnvironment(t)
+
+	testlogName := "test.log"
+	inp := env.mustCreateInput(map[string]interface{}{
+		"id":                                     "fake-ID",
+		"paths":                                  []string{env.abspath(testlogName)},
+		"prospector.scanner.check_interval":      "24h",
+		"close.on_state_change.check_interval":   "100ms",
+		"close.on_state_change.inactive":         "2s",
+		"prospector.scanner.fingerprint.enabled": false,
+		"file_identity.native":                   map[string]any{},
+		"message_max_bytes":                      20,
+	})
+
+	testlines := []byte("first line\nsecond line\nthird line\nthis is a long line exceeding message_max_bytes\n")
+	env.mustWriteToFile(testlogName, testlines)
+
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
+	env.waitUntilEventCount(4)
+	env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
+	env.waitUntilHarvesterIsDone()
+
+	checkMetrics(t, "fake-ID", expectedMetrics{
+		FilesOpened:       1,
+		FilesClosed:       1,
+		FilesActive:       0,
+		MessagesRead:      4,
+		MessagesTruncated: 1,
+		BytesProcessed:    82,
+		EventsProcessed:   4,
+		ProcessingErrors:  0,
+	})
+
+	cancelInput()
+	env.waitUntilInputStops()
+}
+
+func TestFilestreamMultilineMaxLinesTruncatedMetric(t *testing.T) {
+	env := newInputTestingEnvironment(t)
+
+	testlogName := "test.log"
+	inp := env.mustCreateInput(map[string]interface{}{
+		"id":                                     "fake-ID",
+		"paths":                                  []string{env.abspath(testlogName)},
+		"prospector.scanner.check_interval":      "24h",
+		"close.on_state_change.check_interval":   "100ms",
+		"close.on_state_change.inactive":         "2s",
+		"prospector.scanner.fingerprint.enabled": false,
+		"file_identity.native":                   map[string]any{},
+		"parsers": []map[string]interface{}{
+			{
+				"multiline": map[string]interface{}{
+					"type":      "pattern",
+					"pattern":   "^multiline",
+					"negate":    true,
+					"match":     "after",
+					"max_lines": 1,
+					"timeout":   "1s",
+				},
+			},
+		},
+	})
+
+	testlines := []byte("first line\nsecond line\nmultiline first line\nmultiline second line\n")
+	env.mustWriteToFile(testlogName, testlines)
+
+	ctx, cancelInput := context.WithCancel(context.Background())
+	env.startInput(ctx, inp)
+
+	env.waitUntilEventCount(3)
+	env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
+	env.waitUntilHarvesterIsDone()
+
+	checkMetrics(t, "fake-ID", expectedMetrics{
+		FilesOpened:       1,
+		FilesClosed:       1,
+		FilesActive:       0,
+		MessagesRead:      3,
+		MessagesTruncated: 1,
+		BytesProcessed:    66,
+		EventsProcessed:   3,
+		ProcessingErrors:  0,
 	})
 
 	cancelInput()
@@ -65,23 +180,26 @@ func TestFilestreamMetrics(t *testing.T) {
 }
 
 type expectedMetrics struct {
-	FilesOpened      uint64
-	FilesClosed      uint64
-	FilesActive      uint64
-	MessagesRead     uint64
-	BytesProcessed   uint64
-	EventsProcessed  uint64
-	ProcessingErrors uint64
+	FilesOpened       uint64
+	FilesClosed       uint64
+	FilesActive       uint64
+	MessagesRead      uint64
+	MessagesTruncated uint64
+	BytesProcessed    uint64
+	EventsProcessed   uint64
+	ProcessingErrors  uint64
 }
 
 func checkMetrics(t *testing.T, id string, expected expectedMetrics) {
-	reg := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
+	reg, ok := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
+	require.True(t, ok, "registry not found")
 
 	require.Equal(t, id, reg.Get("id").(*monitoring.String).Get(), "id")
 	require.Equal(t, "filestream", reg.Get("input").(*monitoring.String).Get(), "input")
 	require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total")
 	require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total")
 	require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total")
+	require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total")
 	require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total")
 	require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total")
 	require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total")
diff --git a/libbeat/reader/multiline/message_buffer.go b/libbeat/reader/multiline/message_buffer.go
index 506efc7599dd..064dbc4fce79 100644
--- a/libbeat/reader/multiline/message_buffer.go
+++ b/libbeat/reader/multiline/message_buffer.go
@@ -120,11 +120,11 @@ func (b *messageBuffer) addLine(m reader.Message) {
 // finalize writes the existing content into the returned message and resets all reader variables.
 func (b *messageBuffer) finalize() reader.Message {
 	if b.truncated > 0 {
-		b.message.AddFlagsWithKey("log.flags", "truncated")
+		b.message.AddFlagsWithKey("log.flags", "truncated") //nolint:errcheck // It is safe to ignore the error.
 	}
 
 	if b.numLines > 1 {
-		b.message.AddFlagsWithKey("log.flags", "multiline")
+		b.message.AddFlagsWithKey("log.flags", "multiline") //nolint:errcheck // It is safe to ignore the error.
 	}
 
 	// Copy message from existing content