#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include "Hello.h"
#include "HelloSubscriber.h"
#define POLL_PERIOD_SEC 1
#ifndef MACRO_MAX
#define MACRO_MAX(a,b) (((a) > (b)) ? (a) : (b))
#endif
#define ONE_MEGABYTE (1024*1024)
typedef struct CallbackData_t {
long sample_id;
long payload_size;
long sample_lost;
long sample_rcvd;
long sample_rcvd_max;
} CallbackData;
static void CallbackData_initialize(CallbackData *ptr) {
ptr->sample_id = 0L;
ptr->payload_size = 0L;
ptr->sample_lost = 0L;
ptr->sample_rcvd = 0L;
ptr->sample_rcvd_max = 0L;
}
static void process_data(
CallbackData *me,
const char * const instance) {
char idChar[11];
memcpy(idChar, instance, 10);
idChar[10] = '\0';
RTI_SSCANF(idChar, "%ld", &me->sample_id);
++(me->sample_rcvd);
if (me->payload_size == 0) {
me->payload_size = (long)strlen(instance);
}
}
static void on_requested_deadline_missed(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: requested deadline missed.\n");
}
}
static void on_requested_incompatible_qos(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: requested incompatible Qos.\n");
}
}
static void on_sample_rejected(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: sample rejected.\n");
}
}
static void on_liveliness_changed(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: liveliness changed.\n");
}
}
static void on_sample_lost(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: sample lost.\n");
}
++(me->sample_lost);
}
static void on_subscription_matched(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
if (me->verbose) {
printf("->Callback: subscription matched.\n");
}
}
static void on_data_available(
void* listener_data,
CallbackData *me = (CallbackData *)listener_data;
int i;
if (me->sample_rcvd_max != 0 &&
(me->sample_rcvd >= me->sample_rcvd_max)) {
return;
}
if (me->verbose > 2) {
printf("->Callback: data available matched.\n");
}
if (helloReader == NULL) {
fprintf(stderr, "! Unable to narrow data reader into "
"DDS_StringDataReader\n");
return;
}
helloReader,
&data_seq,
&info_seq,
return;
fprintf(stderr, "! Unable to take data from data reader, "
"error %d\n", rc);
return;
}
for (i = 0; i < DDS_StringSeq_get_length(&data_seq); ++i) {
if (DDS_SampleInfoSeq_get_reference(&info_seq, i)->valid_data) {
process_data(me, (char *)DDS_StringSeq_get(&data_seq, i));
}
}
helloReader,
&data_seq,
&info_seq);
fprintf(stderr, "! Unable to return loan, error %d\n", rc);
}
}
CallbackData callback_data;
long stat_first_sequence_id = 0;
time_t time_now;
time_t start_time = time(NULL);
long stat_delta_time_sec;
long last_sample_id = 0;
long last_sample_lost = 0;
long prev_sample_id = 0;
long prev_sample_lost = 0;
long stat_total_samples = 0;
long stat_samples_lost;
float stat_total_sample_per_sec = 0.0;
float stat_current_sample_per_sec = 0.0;
float stat_throughput = 0;
CallbackData_initialize(&callback_data);
callback_data.verbose = verbose;
callback_data.sample_rcvd_max = sample_count;
if (verbose) {
printf("Creating the data reader...\n");
}
participant,
&listener,
if (data_reader == NULL) {
fprintf(stderr, "! Unable to create DDS data reader\n");
}
printf("\n");
printf("Sec.from|Total |Total Lost|Curr Lost |Average |Current |Current\n");
printf("start |samples |samples |samples |smpls/sec |smpls/sec |Mb/sec\n");
printf("--------+----------+----------+----------+----------+----------+----------\n");
for (;;) {
if (sample_count != 0 && (callback_data.sample_rcvd >= sample_count)) {
printf("\nReceived %ld samples.\n", callback_data.sample_rcvd);
break;
}
if (callback_data.sample_id < last_sample_id) {
printf("Detected multiple publishers, or the publisher was restarted.\n");
printf("If you have multiple publishers on the network or you restart\n");
printf("the publisher, the statistics produced won't be accurate.");
break;
}
last_sample_id = callback_data.sample_id;
last_sample_lost = callback_data.sample_lost;
time_now = time(NULL);
if (last_sample_id == 0) {
if (verbose) {
printf("No data...\n");
fflush(stdout);
}
continue;
}
if (stat_first_sequence_id == 0) {
stat_first_sequence_id = last_sample_id;
start_time = time(NULL);
prev_sample_id = last_sample_id;
prev_sample_lost = last_sample_lost;
continue;
}
stat_delta_time_sec = (long)(time_now - start_time);
stat_total_samples = last_sample_id - stat_first_sequence_id
- last_sample_lost;
stat_total_sample_per_sec = (float)stat_total_samples / (float)stat_delta_time_sec;
stat_current_sample_per_sec = (float)(last_sample_id - prev_sample_id)
/ POLL_PERIOD_SEC;
stat_samples_lost = last_sample_lost - prev_sample_lost;
stat_throughput = callback_data.payload_size *
stat_current_sample_per_sec *
8.0f /
ONE_MEGABYTE;
printf("%8ld %10ld %10ld %10ld %10.2f %10.2f %10.2f\n",
stat_delta_time_sec,
stat_total_samples,
last_sample_lost,
stat_samples_lost,
stat_total_sample_per_sec,
stat_current_sample_per_sec,
stat_throughput);
fflush(stdout);
prev_sample_id = last_sample_id;
prev_sample_lost = last_sample_lost;
}
}