00001
00024 #include <stdlib.h>
00025 #include <stdio.h>
00026 #include <errno.h>
00027 #include <ctype.h>
00028 #include "gridlabd.h"
00029 #include "object.h"
00030 #include "aggregate.h"
00031
00032 #include "tape.h"
00033 #include "file.h"
00034 #include "odbc.h"
00035
00036 CLASS *collector_class = NULL;
00037 static OBJECT *last_collector = NULL;
00038
00039 EXPORT int create_collector(OBJECT **obj, OBJECT *parent)
00040 {
00041 *obj = gl_create_object(collector_class);
00042 if (*obj!=NULL)
00043 {
00044 struct collector *my = OBJECTDATA(*obj,struct collector);
00045 last_collector = *obj;
00046 gl_set_parent(*obj,parent);
00047 strcpy(my->file,"");
00048 strcpy(my->filetype,"txt");
00049 strcpy(my->delim,",");
00050 strcpy(my->property,"(undefined)");
00051 strcpy(my->group,"");
00052 my->interval = TS_NEVER;
00053 my->dInterval = -1.0;
00054 my->last.ts = -1;
00055 strcpy(my->last.value,"");
00056 my->limit = 0;
00057 my->samples = 0;
00058 my->status = TS_INIT;
00059 my->trigger[0]='\0';
00060 my->format = 0;
00061 my->aggr = NULL;
00062 return 1;
00063 }
00064 return 0;
00065 }
00066
00067 static int collector_open(OBJECT *obj)
00068 {
00069 char32 type="file";
00070 char1024 fname="";
00071 char32 flags="w";
00072 TAPEFUNCS *tf = 0;
00073 struct collector *my = OBJECTDATA(obj,struct collector);
00074
00075 my->interval = (int64)(my->dInterval/TS_SECOND);
00076
00077
00078 if (sscanf(my->file,"%32[^:]:%1024[^:]:%[^:]",type,fname,flags)==1)
00079 {
00080
00081 strcpy(fname,my->file);
00082 strcpy(type,"file");
00083 }
00084
00085
00086 if (strcmp(fname,"")==0)
00087 {
00088 char *p;
00089
00090 sprintf(fname,"%s.%s",my->group,my->filetype);
00091
00092
00093 for (p=fname; *p!='\0'; p++)
00094 {
00095 if (!isalnum(*p) && *p!='-' && *p!='.')
00096 *p='_';
00097 }
00098 }
00099
00100
00101 tf = get_ftable(type);
00102 if(tf == NULL)
00103 return 0;
00104 my->ops = tf->collector;
00105 if(my->ops == NULL)
00106 return 0;
00107 set_csv_options();
00108 return my->ops->open(my, fname, flags);
00109 }
00110
00111 static int write_collector(struct collector *my, char *ts, char *value)
00112 {
00113 int rc=my->ops->write(my, ts, value);
00114 if ( (my->flush==0 || (my->flush>0 && my->flush%gl_globalclock==0)) && my->ops->flush!=NULL )
00115 my->ops->flush(my);
00116 return rc;
00117 }
00118
00119 static void close_collector(struct collector *my){
00120 if(my->ops){
00121 my->ops->close(my);
00122 }
00123 }
00124
00125 static TIMESTAMP collector_write(OBJECT *obj)
00126 {
00127 struct collector *my = OBJECTDATA(obj,struct collector);
00128 char ts[64];
00129 if (my->format==0)
00130 {
00131
00132
00133
00134 DATETIME dt;
00135 gl_localtime(my->last.ts, &dt);
00136 gl_strtime(&dt, ts, sizeof(ts));
00137 }
00138 else
00139 sprintf(ts,"%" FMT_INT64 "d", my->last.ts);
00140 if ((my->limit>0 && my->samples>my->limit)
00141 || write_collector(my,ts,my->last.value)==0)
00142 {
00143 if (my->ops){
00144 close_collector(my);
00145 } else {
00146 gl_error("collector_write: no TAPEOP structure when closing the tape");
00147 }
00148 my->status = TS_DONE;
00149 }
00150 else
00151 my->samples++;
00152 return TS_NEVER;
00153 }
00154
00155 AGGREGATION *link_aggregates(char *aggregate_list, char *group)
00156 {
00157 char *item;
00158 AGGREGATION *first=NULL, *last=NULL;
00159 char1024 list;
00160 strcpy(list,aggregate_list);
00161 for (item=strtok(list,","); item!=NULL; item=strtok(NULL,","))
00162 {
00163 AGGREGATION *aggr = gl_create_aggregate(item,group);
00164 if (aggr!=NULL)
00165 {
00166
00167 if (first==NULL) first=aggr; else last->next=aggr;
00168 last=aggr;
00169 aggr->next = NULL;
00170 }
00171 else
00172 return NULL;
00173 }
00174 return first;
00175 }
00176
00177 int read_aggregates(AGGREGATION *aggr, char *buffer, int size)
00178 {
00179 AGGREGATION *p;
00180 int offset=0;
00181 int count=0;
00182 char32 fmt;
00183
00184 gl_global_getvar("double_format", fmt, 32);
00185 for (p=aggr; p!=NULL && offset<size-33; p=p->next)
00186 {
00187 if (offset>0) strcpy(buffer+offset++,",");
00188 offset+=sprintf(buffer+offset,fmt,gl_run_aggregate(p));
00189 buffer[offset]='\0';
00190 count++;
00191 }
00192 return count;
00193 }
00194
00195
00196
00197 EXPORT TIMESTAMP sync_collector(OBJECT *obj, TIMESTAMP t0, PASSCONFIG pass)
00198 {
00199 struct collector *my = OBJECTDATA(obj,struct collector);
00200 typedef enum {NONE='\0', LT='<', EQ='=', GT='>'} COMPAREOP;
00201 COMPAREOP comparison;
00202 char1024 buffer = "";
00203
00204 if(my->status == TS_DONE){
00205 return TS_NEVER;
00206 }
00207
00208
00209 if (my->aggr==NULL)
00210 my->aggr = link_aggregates(my->property,my->group);
00211
00212
00213 if (my->aggr==NULL)
00214 {
00215 sprintf(buffer,"'%s' contains an aggregate that is not found in the group '%s'", my->property, my->group);
00216 my->status = TS_ERROR;
00217 goto Error;
00218 }
00219
00220 if((my->status == TS_OPEN) && (t0 > obj->clock)){
00221 obj->clock = t0;
00222 if((my->interval > 0) && (my->last.ts < t0) && (my->last.value[0] != 0)){
00223 collector_write(obj);
00224
00225 my->last.value[0] = 0;
00226 }
00227 }
00228
00229
00230 if(my->aggr != NULL && (my->interval == 0 || my->interval == -1)){
00231 if(read_aggregates(my->aggr,buffer,sizeof(buffer))==0)
00232 {
00233 sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00234 close_collector(my);
00235 my->status = TS_ERROR;
00236 }
00237 }
00238
00239 if(my->aggr != NULL && my->interval > 0){
00240 if((t0 >= my->last.ts + my->interval) || (t0 == my->last.ts)){
00241 if(read_aggregates(my->aggr,buffer,sizeof(buffer))==0){
00242 sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00243 close_collector(my);
00244 my->status = TS_ERROR;
00245 }
00246 my->last.ts = t0;
00247 }
00248 }
00249
00250
00251 comparison = (COMPAREOP)my->trigger[0];
00252 if (comparison!=NONE)
00253 {
00254 int desired = comparison==LT ? -1 : (comparison==EQ ? 0 : (comparison==GT ? 1 : -2));
00255
00256
00257 int actual = strcmp(buffer,my->trigger+1);
00258 if (actual!=desired || (my->status==TS_INIT && !collector_open(obj))){
00259
00260 return (my->interval==0 || my->interval==-1) ? TS_NEVER : t0+my->interval;
00261 }
00262 }
00263 else if (my->status==TS_INIT && !collector_open(obj)){
00264 close_collector(my);
00265 return TS_NEVER;
00266 }
00267
00268 if(my->last.ts < 1 && my->interval != -1){
00269 my->last.ts = t0;
00270 }
00271
00272
00273 if(my->status == TS_OPEN){
00274 if(my->interval == 0
00275 || ((my->interval == -1) && my->last.ts != t0 && strcmp(buffer, my->last.value) != 0)
00276 ){
00277 strncpy(my->last.value, buffer, sizeof(my->last.value));
00278 my->last.ts = t0;
00279 collector_write(obj);
00280 } else if(my->interval > 0 && my->last.ts == t0){
00281 strncpy(my->last.value, buffer, sizeof(my->last.value));
00282 }
00283 }
00284 Error:
00285 if (my->status==TS_ERROR)
00286 {
00287 gl_error("collector %d %s\n",obj->id, buffer);
00288 my->status=TS_DONE;
00289 return 0;
00290 }
00291
00292 return (my->interval==0 || my->interval==-1) ? TS_NEVER : my->last.ts+my->interval;
00293 }
00294