Skip to content

Commit

Permalink
Merge pull request #40 from pubnub/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
MaxPresman authored Mar 5, 2018
2 parents 7444ac0 + a6b08b6 commit 023d23c
Show file tree
Hide file tree
Showing 53 changed files with 3,523 additions and 1,781 deletions.
6 changes: 5 additions & 1 deletion .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
name: c-core
version: 2.2.14
version: 2.3.0
scm: github.com/pubnub/c-core
changelog:
- version: v2.3.0
changes:
- type: feature
text: Use HTTP Keep-alive, where possible
- version: v2.2.14
changes:
- type: bug
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.14
2.3.0
2 changes: 2 additions & 0 deletions core/pbhttp_digest.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#include "pubnub_internal.h"

#include "pbhttp_digest.h"

#include "pbmd5.h"
Expand Down
71 changes: 71 additions & 0 deletions core/pbpal_ntf_callback_admin.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#include "pubnub_internal.h"

#include "pbntf_trans_outcome_common.h"

#include "pubnub_log.h"
#include "pubnub_assert.h"


void pbntf_trans_outcome(pubnub_t* pb)
{
PBNTF_TRANS_OUTCOME_COMMON(pb);
if (pb->cb != NULL) {
pb->cb(pb, pb->trans, pb->core.last_result, pb->user_data);
}
}


enum pubnub_res pubnub_last_result(pubnub_t* pb)
{
enum pubnub_res rslt;

PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
rslt = pb->core.last_result;
pubnub_mutex_unlock(pb->monitor);

return rslt;
}


enum pubnub_res pubnub_register_callback(pubnub_t* pb,
pubnub_callback_t cb,
void* user_data)
{
PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));
pubnub_mutex_lock(pb->monitor);
pb->cb = cb;
pb->user_data = user_data;
pubnub_mutex_unlock(pb->monitor);
return PNR_OK;
}


void* pubnub_get_user_data(pubnub_t* pb)
{
void* result;

PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
result = pb->user_data;
pubnub_mutex_unlock(pb->monitor);

return result;
}


pubnub_callback_t pubnub_get_callback(pubnub_t* pb)
{
pubnub_callback_t result;

PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
result = pb->cb;
pubnub_mutex_unlock(pb->monitor);

return result;
}
44 changes: 44 additions & 0 deletions core/pbpal_ntf_callback_handle_timer_list.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#include "pubnub_internal.h"

#include "pbpal_ntf_callback_handle_timer_list.h"

#include "pubnub_timer_list.h"
#include "pubnub_assert.h"


void pbntf_handle_timer_list(int ms_elapsed, pubnub_t** head)
{
pubnub_t* expired;

PUBNUB_ASSERT_OPT(head != NULL);
PUBNUB_ASSERT_OPT(ms_elapsed > 0);

expired = pubnub_timer_list_as_time_goes_by(head, ms_elapsed);
while (expired != NULL) {
pubnub_t* next;

pubnub_mutex_lock(expired->monitor);
next = expired->next;
pbnc_stop(expired, PNR_TIMEOUT);
pubnub_mutex_unlock(expired->monitor);

expired->next = NULL;
expired->previous = NULL;
expired = next;
}
}


void pbpal_remove_timer_safe(pubnub_t* to_remove, pubnub_t** from_head)
{
PUBNUB_ASSERT_OPT(to_remove != NULL);
PUBNUB_ASSERT_OPT(from_head != NULL);

if (PUBNUB_TIMERS_API) {
if ((to_remove->previous != NULL) || (to_remove->next != NULL)
|| (to_remove == *from_head)) {
*from_head = pubnub_timer_list_remove(*from_head, to_remove);
}
}
}
16 changes: 16 additions & 0 deletions core/pbpal_ntf_callback_handle_timer_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
typedef struct pubnub_ pubnub_t;

/** Checks the timer list with the given @p head for any expired
timers, assuming that @p ms_elapsed since last check.
For all expired, the context FSM will be called to handle
the timeout.
*/
void pbntf_handle_timer_list(int ms_elapsed, pubnub_t** head);

/** Removes the context @p to_remove @p from_head list, in a "safe"
manner. That is, it handles ("ignores") if @p to_remove is not in
@p from_head.
*/
void pbpal_remove_timer_safe(pubnub_t* to_remove, pubnub_t** from_head);
78 changes: 78 additions & 0 deletions core/pbpal_ntf_callback_poller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#if !defined INC_PBPAL_NTF_CALLBACK_POLLER
#define INC_PBPAL_NTF_CALLBACK_POLLER


/** @file pubnub_ntf_callback_poller.h
This is the interface of a "poller". A poller is a "group checker"
for statuses of sockets or other connection handles (let's call
them sockets for short, because that's the mainstream). We can add
and remove sockets from this group, change what to "watch for" and such.
Of course, we can actually, well, poll - get the current status of
what sockets are "ready".
The administration/book-keeping part of this is somewhat generic,
though there are simple differences to handle. Tut the "poll"
itself is not. Even the mainstream has many variants of these -
the BSD sockets themselves have poll() and select(), Unices in
general have signal based I/O (with various degrees of usability
issues), Linux has also epoll(), BSD Unices have kqueue, some
other have `/dev/poll`... Windows has I/O completion ports and the
older and seemingly "out of fashion" completion callbacks w/APC
(Async Procedure Calls) - also Windows poll() is slightly
different and is called WSAPoll().
Of course, other networking APIs have similar, yet "different
enough" interfaces.
So, it's a jungle, really, but, they all basically provide the same
interface and here we have it's definition for our purpose.
*/


struct pbpal_poll_data;
typedef struct pubnub_ pubnub_t;


/** Allocate and Initialize the poller data */
struct pbpal_poll_data* pbpal_ntf_callback_poller_init(void);

/** Add the Pubnub context @p pb to the poll-set @p data */
void pbpal_ntf_callback_save_socket(struct pbpal_poll_data* data, pubnub_t* pb);

/** Remove the Pubnub context @p pb from the poll-set @p data */
void pbpal_ntf_callback_remove_socket(struct pbpal_poll_data* data, pubnub_t* pb);

/** Update the information about the Pubnub context @p pb int the
poll-set @p data. Essentially, this is used when the socket
(connection handle) changes, for some reason.
*/
void pbpal_ntf_callback_update_socket(struct pbpal_poll_data* data, pubnub_t* pb);

/** Watch for "out" events ("can write") on @p pbp context in poll-set
@p data.
*/
int pbpal_ntf_watch_out_events(struct pbpal_poll_data* data, pubnub_t* pbp);

/** Watch for "in" events ("can read") on @p pbp context in poll-set
@p data.
*/
int pbpal_ntf_watch_in_events(struct pbpal_poll_data* data, pubnub_t* pbp);

/** Do the polling and queue any events to process.
Maybe it could return (give back) the contexts that need
processing instead, but, if there are many, that would slow things
down... Another option would be to pass a function pointer here
that this function would call for each context that needs
processing - but, we basically know what we want to do and it's
not configurable.
*/
int pbpal_ntf_poll_away(struct pbpal_poll_data* data, int ms);

/** Deinitialize and deellocate the poller data */
void pbpal_ntf_callback_poller_deinit(struct pbpal_poll_data** data);


#endif /* !defined INC_PBPAL_NTF_CALLBACK_POLLER */
117 changes: 117 additions & 0 deletions core/pbpal_ntf_callback_queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#include "pubnub_internal.h"

#include "pbpal_ntf_callback_queue.h"

#include "pubnub_assert.h"


void pbpal_ntf_callback_queue_init(struct pbpal_ntf_callback_queue* queue)
{
pubnub_mutex_init(queue->monitor);
queue->size = sizeof queue->apb / sizeof queue->apb[0];
queue->head = queue->tail = 0;
}


void pbpal_ntf_callback_queue_deinit(struct pbpal_ntf_callback_queue* queue)
{
pubnub_mutex_destroy(queue->monitor);
queue->head = queue->tail = 0;
}


int pbpal_ntf_callback_enqueue_for_processing(struct pbpal_ntf_callback_queue* queue,
pubnub_t* pb)
{
int result;
size_t next_head;

PUBNUB_ASSERT_OPT(queue != NULL);
PUBNUB_ASSERT_OPT(pb != NULL);

pubnub_mutex_lock(queue->monitor);
next_head = queue->head + 1;
if (next_head == queue->size) {
next_head = 0;
}
if (next_head != queue->tail) {
queue->apb[queue->head] = pb;
queue->head = next_head;
result = +1;
}
else {
result = -1;
}
pubnub_mutex_unlock(queue->monitor);

return result;
}


int pbpal_ntf_callback_requeue_for_processing(struct pbpal_ntf_callback_queue* queue,
pubnub_t* pb)
{
bool found = false;
size_t i;

PUBNUB_ASSERT_OPT(pb != NULL);

pubnub_mutex_lock(queue->monitor);
for (i = queue->tail; i != queue->head;
i = (((i + 1) == queue->size) ? 0 : i + 1)) {
if (queue->apb[i] == pb) {
found = true;
break;
}
}
pubnub_mutex_unlock(queue->monitor);

return !found ? pbpal_ntf_callback_enqueue_for_processing(queue, pb) : 0;
}


void pbpal_ntf_callback_remove_from_queue(struct pbpal_ntf_callback_queue* queue,
pubnub_t* pb)
{
size_t i;

PUBNUB_ASSERT_OPT(queue != NULL);
PUBNUB_ASSERT_OPT(pb != NULL);

pubnub_mutex_lock(queue->monitor);
for (i = queue->tail; i != queue->head;
i = (((i + 1) == queue->size) ? 0 : i + 1)) {
if (queue->apb[i] == pb) {
queue->apb[i] = NULL;
break;
}
}
pubnub_mutex_unlock(queue->monitor);
}


void pbpal_ntf_callback_process_queue(struct pbpal_ntf_callback_queue* queue)
{
pubnub_mutex_lock(queue->monitor);
while (queue->head != queue->tail) {
pubnub_t* pbp = queue->apb[queue->tail++];
if (queue->tail == queue->size) {
queue->tail = 0;
}
if (pbp != NULL) {
pubnub_mutex_unlock(queue->monitor);
pubnub_mutex_lock(pbp->monitor);
if (pbp->state == PBS_NULL) {
pubnub_mutex_unlock(pbp->monitor);
pballoc_free_at_last(pbp);
}
else {
pbnc_fsm(pbp);
pubnub_mutex_unlock(pbp->monitor);
}
pubnub_mutex_lock(queue->monitor);
}
}
pubnub_mutex_unlock(queue->monitor);
}
Loading

0 comments on commit 023d23c

Please sign in to comment.