Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue133 - accepting non string headers. #139

Merged
merged 4 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ care of::

debuild -uc -us

will build a package you can install. Otherwise just *make*, and you need
will build a package you can install. Otherwise just *make*, and you need
to install the bits yourself.


Expand All @@ -54,28 +54,7 @@ This function takes the same options as
`sr3_post <https://metpx.github.io/sarracenia/Reference/sr3_post.1.html>`_.

but the *sleep* argument, when supplied causes it to loop, checking for new
items every *sleep* seconds (equivalent to sr3_watch.) There is also a sample consumer::

sr3_cpump

which obtains messages and, by default, prints them to standard output in json
format identical the the format used by the python implementation for
save/restore.

`sr3_cpump <https://metpx.github.io/sarracenia/Reference/sr3_cpump.1.html>`_.

In order to have a complete downloader, one needs a script to
parse the json output and invoke an appropriate binary downloader. One can
use the 'outlet' switch to choose other formats:

json:
the default format, json compatible with python save/restore.

post:
turns sr3_cpump into an sr3 shovel, if cache is on, then it is a winnow.

url:
just print out the retrieval urls, rather than the entire message
items every *sleep* seconds (equivalent to sr3_watch.)

There is also an LD_PRELOAD shim library. (libsr3shim.c) that uses the posting
API, this is to be used in `very high volume use cases <https://github.com/MetPX/sarracenia/blob/main/doc/hpc_mirroring_use_case.rst>`_
Expand Down Expand Up @@ -159,6 +138,40 @@ fields present:
* 504576 pid of the process doing the logging.
* 0.0270023 elapsed wallclock time of the process since it started (in seconds.)

Lastly, There is also a sample consumer::

sr3_cpump

which obtains messages and, by default, prints them to standard output in v03
format.

`sr3_cpump <https://metpx.github.io/sarracenia/Reference/sr3_cpump.1.html>`_.

In order to have a complete downloader, one needs a script to
parse the json output and invoke an appropriate binary downloader. One can
use the 'outlet' switch to choose other formats:

json:
the default format, json compatible with python save/restore.

post:
turns sr3_cpump into an sr3 shovel, if cache is on, then it is a winnow.

url:
just print out the retrieval urls, rather than the entire message

NOTE:

* The posting logic (sr3_cpost and the library) are the focus of the implementation.
They fully work.

* The consumer logic in C is functional, but not completely robust.
It should not be used in operations, but is more of a technology demonstrator.
It acknowledges receipt of messages before application processing has completed
( https://github.com/MetPX/sarrac/issues/121 )

We have flow tests demonstrating failure modes and message loss.
The tests are disabled for now because the C consumer does lose messages.


Source Code Documentation
Expand Down
2 changes: 1 addition & 1 deletion shim_copy_strip_slash.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ nodupe_ttl 0
header toto=pig
events modify,link,delete,mkdir,rmdir

post_baseUrl sftp://${USER}@localhost
post_baseUrl sftp://${USER}@localhost/
post_topicPrefix v03.post

accept `realpath .`/.*
Expand Down
6 changes: 4 additions & 2 deletions sr_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ struct sr_header_s *sr_headers_copy(struct sr_header_s *o)
i = (struct sr_header_s *)malloc(sizeof(struct sr_header_s));
i->key = strdup(o->key);
i->value = strdup(o->value);
i->is_numeric = o->is_numeric;
i->next = n;
n = i;
}
Expand Down Expand Up @@ -460,7 +461,7 @@ int sr_add_decl(struct sr_config_s *cfg, char *what, char *s)

}

int sr_add_header(struct sr_config_s *cfg, char *s)
int sr_add_header(struct sr_config_s *cfg, char *s, bool is_numeric)
/*
Add a (user defined) header to the list of existing ones.
see StrinIsTrue for explanation of bitmask return values.
Expand All @@ -483,6 +484,7 @@ int sr_add_header(struct sr_config_s *cfg, char *s)
*eq = '=';
new_header->value = strdup(eq + 1);
new_header->next = cfg->user_headers;
new_header->is_numeric = is_numeric;
cfg->user_headers = new_header;
return (3);
}
Expand Down Expand Up @@ -958,7 +960,7 @@ int sr_config_parse_option(struct sr_config_s *sr_cfg, char *option, char *arg,
retval = (1);

} else if (!strcmp(option, "header")) {
val = sr_add_header(sr_cfg, argument);
val = sr_add_header(sr_cfg, argument, false);
retval = (1 + (val & 1));

} else if (!strcmp(option, "logrotate") || !strcmp(option, "lr")
Expand Down
2 changes: 2 additions & 0 deletions sr_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ struct sr_header_s {
/**< the key string */
char *value;
/**< the value string */
bool is_numeric;
/**< whether it is a numeric value that should not be quoted on output. */
struct sr_header_s *next;
/**< link to the next item in the singly linked list. */
};
Expand Down
67 changes: 49 additions & 18 deletions sr_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ static void assign_field(const char *key, char *value)
h = (struct sr_header_s *)malloc(sizeof(struct sr_header_s));
h->key = strdup(key);
h->value = strdup(value);
h->is_numeric = false;
h->next = msg.user_headers;
msg.user_headers = h;
}
Expand Down Expand Up @@ -521,6 +522,7 @@ static void v03assign_field(const char *key, json_object * jso_v)
h->key = strdup("oldname");
h->value = strdup(v);
h->next = msg.user_headers;
h->is_numeric = false;
msg.user_headers = h;
}
} else if (!strcmp(key,"identity") || (!strcmp(key, "integrity"))) {
Expand Down Expand Up @@ -592,16 +594,21 @@ static void v03assign_field(const char *key, json_object * jso_v)
} else {
h->value = strdup(unsupported);
}
h->is_numeric = false;
h->next = msg.user_headers;
msg.user_headers = h;
}
}

#endif

static void json_dump_strheader(char *tag, char *value)
static void json_dump_strheader(char *tag, char *value, bool is_numeric)
{
printf("\"%s\": \"%s\"", tag, value);
if (is_numeric) {
printf("\"%s\": %s", tag, value);
} else {
printf("\"%s\": \"%s\"", tag, value);
}
}

char *sr_message_2log(struct sr_message_s *m)
Expand Down Expand Up @@ -684,23 +691,23 @@ void sr_message_2json(struct sr_message_s *m)

printf("[");
printf(" \"%s\", { ", m->routing_key);
json_dump_strheader("atime", m->atime);
json_dump_strheader("atime", m->atime, false);
printf(", ");
printf("\"mode\": \"%04o\"", m->mode);
printf(", ");
json_dump_strheader("mtime", m->mtime);
json_dump_strheader("mtime", m->mtime, false);
printf(", ");
printf("\"parts\": \"%c,%ld,%ld,%ld,%ld\"",
m->parts_s, m->parts_blksz, m->parts_blkcount, m->parts_rem, m->parts_num);
printf(", ");
json_dump_strheader("source", m->source);
json_dump_strheader("source", m->source, false);
printf(", ");
json_dump_strheader("sum", m->sum);
json_dump_strheader("sum", m->sum, false);
printf(", ");

for (h = msg.user_headers; h; h = h->next) {
printf(", ");
json_dump_strheader(h->key, h->value);
json_dump_strheader(h->key, h->value, h->is_numeric);
}
printf(" } \"%s %s %s\"", m->datestamp, m->url, m->relPath);
printf("]\n");
Expand Down Expand Up @@ -941,31 +948,55 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
break;

case AMQP_FIELD_KIND_U64:
sprintf(tag, "%.*s",
(int)p->headers.entries[i].key.len,
(char *)p->headers.entries[i].key.bytes);

sprintf(value, "%lu", p->headers.entries[i].value.value.u64 );

assign_field(tag, value);
sr_log_msg(LOG_WARNING,
"skipping U64 header %d value:%lld\n", i,
"skipping U64 header %d value:%ld\n", i,
(long long unsigned)(p->headers.entries[i].value.
value.u64));
goto after_headers;
break;

case AMQP_FIELD_KIND_ARRAY:
sr_log_msg(LOG_WARNING, "skipping ARRAY header index: %d\n",
i);
sprintf(tag, "%.*s",
(int)p->headers.entries[i].key.len,
(char *)p->headers.entries[i].key.bytes);

sr_log_msg(LOG_WARNING, "skipping ARRAY header %s index: %d\n", tag, i);
goto after_headers;
break;

case AMQP_FIELD_KIND_I32:
sprintf(tag, "%.*s",
(int)p->headers.entries[i].key.len,
(char *)p->headers.entries[i].key.bytes);

sprintf(value, "%u", p->headers.entries[i].value.value.u32 );
assign_field(tag, value);
break;

case AMQP_FIELD_KIND_I64:
sr_log_msg(LOG_WARNING,
"skipping I64 header %d: value:%lld\n", i,
(long long)(p->headers.entries[i].value.value.
i64));
goto after_headers;
sprintf(tag, "%.*s",
(int)p->headers.entries[i].key.len,
(char *)p->headers.entries[i].key.bytes);

sprintf(value, "%ld", p->headers.entries[i].value.value.i64 );
assign_field(tag, value);

break;

default:
sprintf(tag, "%.*s",
(int)p->headers.entries[i].key.len,
(char *)p->headers.entries[i].key.bytes);

sr_log_msg(LOG_WARNING,
"skipping non UTF8 headers: amount: %d, this one: %d, kind:%d\n",
p->headers.num_entries, i,
"skipping non UTF8 headers: %s, amount: %d, this one: %d, kind:%d\n",
tag, p->headers.num_entries, i,
p->headers.entries[i].value.kind);
goto after_headers;

Expand Down