tape/collector.c

Go to the documentation of this file.
00001 
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 #include <errno.h>
00026 #include <ctype.h>
00027 #include "gridlabd.h"
00028 #include "object.h"
00029 #include "aggregate.h"
00030 
00031 #include "tape.h"
00032 #include "file.h"
00033 #include "odbc.h"
00034 
00035 CLASS *collector_class = NULL;
00036 static OBJECT *last_collector = NULL;
00037 
00038 EXPORT int create_collector(OBJECT **obj, OBJECT *parent)
00039 {
00040     *obj = gl_create_object(collector_class,sizeof(struct collector));
00041     if (*obj!=NULL)
00042     {
00043         struct collector *my = OBJECTDATA(*obj,struct collector);
00044         last_collector = *obj;
00045         gl_set_parent(*obj,parent);
00046         strcpy(my->file,"");
00047         strcpy(my->filetype,"txt");
00048         strcpy(my->delim,",");
00049         strcpy(my->property,"(undefined)");
00050         strcpy(my->group,"");
00051         my->interval = TS_NEVER; /* transients only */
00052         my->last.ts = -1;
00053         strcpy(my->last.value,"");
00054         my->limit = 0;
00055         my->samples = 0;
00056         my->status = TS_INIT;
00057         my->trigger[0]='\0';
00058         my->format = 0;
00059         my->aggr = NULL;
00060         return 1;
00061     }
00062     return 0;
00063 }
00064 
00065 static int collector_open(OBJECT *obj)
00066 {
00067     char32 type="file";
00068     char1024 fname="";
00069     char32 flags="w";
00070     struct collector *my = OBJECTDATA(obj,struct collector);
00071     
00072     /* if prefix is omitted (no colons found) */
00073     if (sscanf(my->file,"%32[^:]:%1024[^:]:%[^:]",type,fname,flags)==1)
00074     {
00075         /* filename is file by default */
00076         strcpy(fname,my->file);
00077         strcpy(type,"file");
00078     }
00079 
00080     /* if no filename given */
00081     if (strcmp(fname,"")==0)
00082     {
00083         char *p;
00084         /* use group spec as default file name */
00085         sprintf(fname,"%s.%s",my->group,my->filetype);
00086 
00087         /* but change disallowed characters to _ */
00088         for (p=fname; *p!='\0'; p++)
00089         {
00090             if (!isalnum(*p) && *p!='-' && *p!='.')
00091                 *p='_';
00092         }
00093     }
00094 
00095     /* if type is file or file is stdin */
00096     my->ops = get_ftable(type)->collector;
00097     if(my->ops == NULL)
00098         return 0;
00099     return my->ops->open(my, fname, flags);
00100     /*
00101     if (strcmp(type,"file")==0)
00102         return file_open_collector(my,fname,flags);
00103     else if (strcmp(type,"odbc")==0)
00104         return odbc_open_collector(my,fname,flags);
00105     else if (strcmp(type,"memory")==0)
00106         return memory_open_collector(my,fname,flags);
00107     else
00108         return 0;
00109     */
00110 }
00111 
00112 static int write_collector(struct collector *my, char *ts, char *value)
00113 {
00114     return my->ops->write(my, ts, value);
00115 
00116     switch (my->type) {
00117     case FT_FILE:
00118         return file_write_collector(my,ts,value);
00119         break;
00120     case FT_ODBC:
00121         return odbc_write_collector(my,ts,value);
00122         break;
00123     case FT_MEMORY:
00124         return memory_write_collector(my,ts,value);
00125         break;
00126     default:
00127         return 0;
00128         break;
00129     }
00130 }
00131 
00132 static TIMESTAMP collector_write(OBJECT *obj)
00133 {
00134     struct collector *my = OBJECTDATA(obj,struct collector);
00135     char ts[64];
00136     if (my->format==0)
00137     {
00138         time_t t = (time_t)(my->last.ts*TS_SECOND);
00139         strftime(ts,sizeof(ts),timestamp_format, gmtime(&t));
00140     }
00141     else
00142         sprintf(ts,"%" FMT_INT64 "d", my->last.ts);
00143     if ((my->limit>0 && my->samples>my->limit) /* limit reached */
00144         || write_collector(my,ts,my->last.value)==0) /* write failed */
00145     {
00146         switch (my->type) {
00147         case FT_FILE:
00148             file_close_collector(my);
00149             break;
00150         case FT_ODBC:
00151             odbc_close_collector(my);
00152             break;
00153         case FT_MEMORY:
00154             memory_close_collector(my);
00155             break;
00156         default:
00157             break;
00158         }
00159         my->status = TS_DONE;
00160     }
00161     else
00162         my->samples++;
00163     return TS_NEVER;
00164 }
00165 
00166 AGGREGATION *link_aggregates(char *aggregate_list, char *group)
00167 {
00168     char *item;
00169     AGGREGATION *first=NULL, *last=NULL;
00170     char1024 list;
00171     strcpy(list,aggregate_list); /* avoid destroying orginal list */
00172     for (item=strtok(list,","); item!=NULL; item=strtok(NULL,","))
00173     {
00174         AGGREGATION *aggr = gl_create_aggregate(item,group);
00175         if (aggr!=NULL)
00176         {
00177             /* TODO: ideally the aggregation group program from the previous should be reused */
00178             if (first==NULL) first=aggr; else last->next=aggr;
00179             last=aggr;
00180             aggr->next = NULL;
00181         }
00182         else
00183             return NULL;
00184     }
00185     return first;
00186 }
00187 
00188 int read_aggregates(AGGREGATION *aggr, char *buffer, int size)
00189 {
00190     AGGREGATION *p;
00191     int offset=0;
00192     int count=0;
00193     for (p=aggr; p!=NULL && offset<size-33; p=p->next)
00194     {
00195         if (offset>0) strcpy(buffer+offset++,",");
00196         offset+=sprintf(buffer+offset,"%lg",gl_run_aggregate(p));
00197         buffer[offset]='\0';
00198         count++;
00199     }
00200     return count;
00201 }
00202 
00203 EXPORT TIMESTAMP sync_collector(OBJECT *obj, TIMESTAMP t0, PASSCONFIG pass)
00204 {
00205     struct collector *my = OBJECTDATA(obj,struct collector);
00206     typedef enum {NONE='\0', LT='<', EQ='=', GT='>'} COMPAREOP;
00207     COMPAREOP comparison;
00208     char1024 buffer = "";
00209     
00210     if (my->status==TS_DONE)
00211         return TS_NEVER;
00212 
00213     /* connect to property */
00214     if (my->aggr==NULL)
00215         my->aggr = link_aggregates(my->property,my->group);
00216 
00217     /* read property */
00218     if (my->aggr==NULL)
00219     {
00220         sprintf(buffer,"'%s' contains an aggregate that is not found in the group '%s'", my->property, my->group);
00221         my->status = TS_ERROR;
00222     }
00223     else if (read_aggregates(my->aggr,buffer,sizeof(buffer))==0)
00224     {
00225         sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00226         my->status = TS_ERROR;
00227     }
00228 
00229     /* check trigger, if any */
00230     comparison = (COMPAREOP)my->trigger[0];
00231     if (comparison!=NONE)
00232     {
00233         int desired = comparison==LT ? -1 : (comparison==EQ ? 0 : (comparison==GT ? 1 : -2));
00234 
00235         /* if not trigger or can't get access */
00236         int actual = strcmp(buffer,my->trigger+1);
00237         if (actual!=desired || (my->status==TS_INIT && !collector_open(obj)))
00238 
00239             /* better luck next time */
00240             return TS_NEVER;
00241     }
00242     else if (my->status==TS_INIT && !collector_open(obj))
00243         return TS_NEVER;
00244 
00245     /* write tape */
00246     if (my->status==TS_OPEN)
00247     {   
00248         if (my->interval==0 /* sample on every pass */
00249             || ((my->interval==-1) && my->last.ts!=t0 && strcmp(buffer,my->last.value)!=0) /* sample only when value changes */
00250             || my->last.ts+my->interval<=t0) /* sample regularly */
00251         {
00252             my->last.ts = t0;
00253             strncpy(my->last.value,buffer,sizeof(my->last.value));
00254             collector_write(obj);
00255         }
00256     }
00257     else if (my->status==TS_ERROR)
00258     {
00259         gl_error("collector %d %s\n",obj->id, buffer);
00260         my->status=TS_DONE;
00261         return 0; /* failed */
00262     }
00263 
00264     return (my->interval==0 || my->interval==-1) ? TS_NEVER : t0+my->interval;
00265 }
00266 

GridLAB-DTM Version 1.0
An open-source project initiated by the US Department of Energy