Skip to content

Commit

Permalink
Sketch out implementation for Go custom plugins to support OTel exten…
Browse files Browse the repository at this point in the history
…sions
  • Loading branch information
robskillington committed Oct 7, 2024
1 parent 58c1330 commit 8d46b52
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 1 deletion.
5 changes: 5 additions & 0 deletions build_go_extensions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

# !! TODO: move to a Makefile

CGO_CFLAGS="-I${HOME}/src/fluent-bit/include -I${HOME}/src/fluent-bit/lib/monkey/include -I${HOME}/src/fluent-bit/build/lib/monkey/include/monkey -I${HOME}/src/fluent-bit/lib/cfl/include -I${HOME}/src/fluent-bit/lib/cfl/lib/xxhash -I${HOME}/src/fluent-bit/lib/cmetrics/include -I${HOME}/src/fluent-bit/lib/flb_libco -I${HOME}/src/fluent-bit/lib/c-ares-1.33.1/include -I${HOME}/src/fluent-bit/lib/msgpack-c/include -I${HOME}/src/fluent-bit/lib/ctraces/include -I${HOME}/src/fluent-bit/lib/ctraces/lib/mpack/src" CGO_LDFLAGS="-L${HOME}/src/fluent-bit/build/lib -lfluent-bit" sh -c 'echo CGO_CFLAGS=${CGO_CFLAGS}; echo CGO_LDFLAGS=${CGO_LDFLAGS}; go build -buildmode=c-shared -o build/bin/custom_extensions_go.so github.com/fluent/fluent-bit/plugins/custom_extensions_go'
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/fluent/fluent-bit

go 1.23.0
11 changes: 11 additions & 0 deletions include/fluent-bit/flb_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,20 @@
#include <fluent-bit/flb_metrics.h>
#endif

/* Custom plugin types */
#define FLB_CUSTOM_PLUGIN_CORE 0
#define FLB_CUSTOM_PLUGIN_PROXY 1

struct flb_custom_instance;

struct flb_custom_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* Flags (not available at the moment */
char *name; /* Custom plugin short name */
char *description; /* Description */
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
#define FLB_PROXY_OUTPUT_PLUGIN 2
#define FLB_PROXY_CUSTOM_PLUGIN 3

/* Proxies available */
#define FLB_PROXY_GOLANG 11
Expand Down
103 changes: 103 additions & 0 deletions plugins/custom_extensions_go/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Fluent Bit Go!
// ==============
// Copyright (C) 2024 The Fluent Bit Go Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package main

// #include <stdlib.h>
// #include "fluent-bit/flb_plugin.h"
// #include "fluent-bit/flb_plugin_proxy.h"
// #include "fluent-bit/flb_custom.h"
import "C"

import (
"fmt"
"time"
"unsafe"
)

// Define constants matching Fluent Bit core
const (
FLB_ERROR = C.FLB_ERROR
FLB_OK = C.FLB_OK
FLB_RETRY = C.FLB_RETRY

FLB_PROXY_CUSTOM_PLUGIN = C.FLB_CF_CUSTOM
FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG
)

// Local type to define a plugin definition
type (
FLBPluginProxyDef C.struct_flb_plugin_proxy_def
FLBCustomInstance C.struct_flb_custom_instance
)

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
//
//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer, name, desc string) int {
p := (*FLBPluginProxyDef)(def)
p._type = FLB_PROXY_CUSTOM_PLUGIN
p.proxy = FLB_PROXY_GOLANG
p.flags = 0
p.name = C.CString(name)
p.description = C.CString(desc)
return 0
}

// (fluentbit will call this)
// plugin (context) pointer to fluentbit context (state/ c code)
//
//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
extensions := FLBPluginConfigKey(plugin, "extensions")
fmt.Printf("[flb-go] extensions = '%s'\n", extensions)
go func() {
for {
fmt.Printf("[flb-go] Go extensions alive %v\n", time.Now())
time.Sleep(10 * time.Second)
}
}()
return FLB_OK
}

// Release resources allocated by the plugin initialization
//
//export FLBPluginUnregister
func FLBPluginUnregister(def unsafe.Pointer) {
p := (*FLBPluginProxyDef)(def)
C.free(unsafe.Pointer(p.name))
C.free(unsafe.Pointer(p.description))
}

//export FLBPluginExit
func FLBPluginExit() int {
return FLB_OK
}

func FLBPluginConfigKey(plugin unsafe.Pointer, key string) string {
k := C.CString(key)
p := plugin
v := C.GoString(C.flb_custom_get_property(k, (*C.struct_flb_custom_instance)(p)))
C.free(unsafe.Pointer(k))
return v
}

func main() {
}
82 changes: 82 additions & 0 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_custom.h>

/* Proxies */
#include "proxy/go/go.h"
Expand Down Expand Up @@ -418,6 +419,42 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *ins,
struct flb_config *config, void *data);

static int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config);

static int flb_proxy_register_custom(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
{
struct flb_custom_plugin *custom;

custom = flb_calloc(1, sizeof(struct flb_custom_plugin));
if (!custom) {
flb_errno();
return -1;
}

/* Plugin registration */
custom->type = FLB_CUSTOM_PLUGIN_PROXY;
custom->proxy = proxy;
custom->flags = def->flags;
custom->name = flb_strdup(def->name);
custom->description = def->description;
mk_list_add(&custom->_head, &config->custom_plugins);

/*
* Set proxy callbacks: external plugins which are not following
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
custom->cb_init = flb_proxy_custom_cb_init;
custom->cb_exit = flb_proxy_custom_cb_exit;
return 0;
}

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol)
{
Expand Down Expand Up @@ -483,6 +520,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
}
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
ret = proxy_go_input_register(proxy, def);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
ret = proxy_go_custom_register(proxy, def);
}
#endif
}
Expand All @@ -497,6 +537,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
flb_proxy_register_input(proxy, def, config);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
flb_proxy_register_custom(proxy, def, config);
}
}

return 0;
Expand Down Expand Up @@ -612,3 +655,42 @@ int flb_plugin_proxy_set(struct flb_plugin_proxy_def *def, int type,

return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_proxy_context *pc;
struct flb_plugin_proxy *proxy;

pc = (struct flb_plugin_proxy_context *)(ins->context);
proxy = pc->proxy;

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_init(proxy);
#endif
}

return ret;
}

int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config)
{
int ret = -1;
struct flb_plugin_proxy_context *ctx = custom_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
if (!custom_context) {
return ret;
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_destroy(ctx);
#endif
}

flb_free(ctx);
return ret;
}
87 changes: 86 additions & 1 deletion src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_custom.h>
#include "./go.h"

/*
Expand All @@ -41,7 +42,7 @@
*
* - name: shortname of the plugin.
* - description: plugin description.
* - type: input, output, filter, whatever.
* - type: input, output, filter, processor, custom.
* - proxy: type of proxy e.g. GOLANG
* - flags: optional flags, not used by Go plugins at the moment.
*
Expand Down Expand Up @@ -286,3 +287,87 @@ void proxy_go_input_unregister(void *data) {
flb_free(plugin->name);
flb_free(plugin);
}

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def)
{
struct flbgo_custom_plugin *plugin;

plugin = flb_malloc(sizeof(struct flbgo_custom_plugin));
if (!plugin) {
return -1;
}

/*
* Lookup the entry point function:
*
* - FLBPluginInit
* - FLBPluginExit
*
* note: registration callback FLBPluginRegister() is resolved by the
* parent proxy interface.
*/

plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit");
if (!plugin->cb_init) {
flb_error("[go proxy]: could not load FLBPluginInit symbol");
flb_free(plugin);
return -1;
}

plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");

plugin->name = flb_strdup(def->name);

/* This Go plugin context is an opaque data for the parent proxy */
proxy->data = plugin;

return 0;
}

int proxy_go_custom_init(struct flb_plugin_proxy *proxy)
{
int ret = 0;
struct flbgo_custom_plugin *plugin = proxy->data;

/* set the API */
plugin->api = proxy->api;
plugin->i_ins = proxy->instance;
// In order to avoid having the whole instance as part of the ABI we
// copy the context pointer into the plugin.
plugin->context = ((struct flb_custom_instance *)proxy->instance)->context;

ret = plugin->cb_init(plugin);
if (ret <= 0) {
flb_error("[go proxy]: plugin '%s' failed to initialize",
plugin->name);
flb_free(plugin);
return -1;
}

return ret;
}

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx)
{
int ret = 0;
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) ctx->proxy->data;
flb_debug("[GO] running exit callback");

if (plugin->cb_exit) {
ret = plugin->cb_exit();
}

return ret;
}

void proxy_go_custom_unregister(void *data) {
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
}

18 changes: 18 additions & 0 deletions src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ struct flbgo_input_plugin {
int (*cb_exit)();
};

struct flbgo_custom_plugin {
char *name;
void *api;
void *i_ins;
struct flb_plugin_proxy_context *context;

int (*cb_init)();
int (*cb_exit)();
};

int proxy_go_output_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

Expand All @@ -69,4 +79,12 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
void *allocated_data);
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
void proxy_go_input_unregister(void *data);

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

int proxy_go_custom_init(struct flb_plugin_proxy *proxy);

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx);
void proxy_go_custom_unregister(void *data);
#endif

0 comments on commit 8d46b52

Please sign in to comment.