00001
00024 #include <stdlib.h>
00025 #include <stdio.h>
00026 #include <errno.h>
00027 #include <ctype.h>
00028 #include "gridlabd.h"
00029 #include "object.h"
00030 #include "aggregate.h"
00031
00032 #include "tape.h"
00033 #include "file.h"
00034 #include "odbc.h"
00035
00036 CLASS *collector_class = NULL;
00037 static OBJECT *last_collector = NULL;
00038
00039 EXPORT int create_collector(OBJECT **obj, OBJECT *parent)
00040 {
00041 *obj = gl_create_object(collector_class);
00042 if (*obj!=NULL)
00043 {
00044 struct collector *my = OBJECTDATA(*obj,struct collector);
00045 last_collector = *obj;
00046 gl_set_parent(*obj,parent);
00047 strcpy(my->file,"");
00048 strcpy(my->filetype,"txt");
00049 strcpy(my->delim,",");
00050 strcpy(my->property,"(undefined)");
00051 strcpy(my->group,"");
00052 my->interval = TS_NEVER;
00053 my->dInterval = -1.0;
00054 my->last.ts = -1;
00055 strcpy(my->last.value,"");
00056 my->limit = 0;
00057 my->samples = 0;
00058 my->status = TS_INIT;
00059 my->trigger[0]='\0';
00060 my->format = 0;
00061 my->aggr = NULL;
00062 return 1;
00063 }
00064 return 0;
00065 }
00066
00067 static int collector_open(OBJECT *obj)
00068 {
00069 char32 type="file";
00070 char1024 fname="";
00071 char32 flags="w";
00072 struct collector *my = OBJECTDATA(obj,struct collector);
00073
00074 my->interval = (int64)(my->dInterval/TS_SECOND);
00075
00076
00077 if (sscanf(my->file,"%32[^:]:%1024[^:]:%[^:]",type,fname,flags)==1)
00078 {
00079
00080 strcpy(fname,my->file);
00081 strcpy(type,"file");
00082 }
00083
00084
00085 if (strcmp(fname,"")==0)
00086 {
00087 char *p;
00088
00089 sprintf(fname,"%s.%s",my->group,my->filetype);
00090
00091
00092 for (p=fname; *p!='\0'; p++)
00093 {
00094 if (!isalnum(*p) && *p!='-' && *p!='.')
00095 *p='_';
00096 }
00097 }
00098
00099
00100 my->ops = get_ftable(type)->collector;
00101 if(my->ops == NULL)
00102 return 0;
00103 return my->ops->open(my, fname, flags);
00104 }
00105
00106 static int write_collector(struct collector *my, char *ts, char *value)
00107 {
00108 return my->ops->write(my, ts, value);
00109 }
00110
00111 static void close_collector(struct collector *my){
00112 if(my->ops){
00113 my->ops->close(my);
00114 }
00115 }
00116
00117 static TIMESTAMP collector_write(OBJECT *obj)
00118 {
00119 struct collector *my = OBJECTDATA(obj,struct collector);
00120 char ts[64];
00121 if (my->format==0)
00122 {
00123
00124
00125
00126 DATETIME dt;
00127 gl_localtime(my->last.ts, &dt);
00128 gl_strtime(&dt, ts, sizeof(ts));
00129 }
00130 else
00131 sprintf(ts,"%" FMT_INT64 "d", my->last.ts);
00132 if ((my->limit>0 && my->samples>my->limit)
00133 || write_collector(my,ts,my->last.value)==0)
00134 {
00135 if (my->ops){
00136 close_collector(my);
00137 } else {
00138 gl_error("collector_write: no TAPEOP structure when closing the tape");
00139 }
00140 my->status = TS_DONE;
00141 }
00142 else
00143 my->samples++;
00144 return TS_NEVER;
00145 }
00146
00147 AGGREGATION *link_aggregates(char *aggregate_list, char *group)
00148 {
00149 char *item;
00150 AGGREGATION *first=NULL, *last=NULL;
00151 char1024 list;
00152 strcpy(list,aggregate_list);
00153 for (item=strtok(list,","); item!=NULL; item=strtok(NULL,","))
00154 {
00155 AGGREGATION *aggr = gl_create_aggregate(item,group);
00156 if (aggr!=NULL)
00157 {
00158
00159 if (first==NULL) first=aggr; else last->next=aggr;
00160 last=aggr;
00161 aggr->next = NULL;
00162 }
00163 else
00164 return NULL;
00165 }
00166 return first;
00167 }
00168
00169 int read_aggregates(AGGREGATION *aggr, char *buffer, int size)
00170 {
00171 AGGREGATION *p;
00172 int offset=0;
00173 int count=0;
00174 char32 fmt;
00175
00176 gl_global_getvar("double_format", fmt, 32);
00177 for (p=aggr; p!=NULL && offset<size-33; p=p->next)
00178 {
00179 if (offset>0) strcpy(buffer+offset++,",");
00180 offset+=sprintf(buffer+offset,fmt,gl_run_aggregate(p));
00181 buffer[offset]='\0';
00182 count++;
00183 }
00184 return count;
00185 }
00186
00187
00188
00189 EXPORT TIMESTAMP sync_collector(OBJECT *obj, TIMESTAMP t0, PASSCONFIG pass)
00190 {
00191 struct collector *my = OBJECTDATA(obj,struct collector);
00192 typedef enum {NONE='\0', LT='<', EQ='=', GT='>'} COMPAREOP;
00193 COMPAREOP comparison;
00194 char1024 buffer = "";
00195
00196 if(my->status == TS_DONE){
00197 return TS_NEVER;
00198 }
00199
00200
00201 if (my->aggr==NULL)
00202 my->aggr = link_aggregates(my->property,my->group);
00203
00204
00205 if (my->aggr==NULL)
00206 {
00207 sprintf(buffer,"'%s' contains an aggregate that is not found in the group '%s'", my->property, my->group);
00208 my->status = TS_ERROR;
00209 goto Error;
00210 }
00211
00212 if((my->status == TS_OPEN) && (t0 > obj->clock)){
00213 obj->clock = t0;
00214 if((my->interval > 0) && (my->last.ts < t0) && (my->last.value[0] != 0)){
00215 collector_write(obj);
00216
00217 my->last.value[0] = 0;
00218 }
00219 }
00220
00221
00222 if(my->aggr != NULL && (my->interval == 0 || my->interval == -1)){
00223 if(read_aggregates(my->aggr,buffer,sizeof(buffer))==0)
00224 {
00225 sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00226 close_collector(my);
00227 my->status = TS_ERROR;
00228 }
00229 }
00230
00231 if(my->aggr != NULL && my->interval > 0){
00232 if((t0 >= my->last.ts + my->interval) || (t0 == my->last.ts)){
00233 if(read_aggregates(my->aggr,buffer,sizeof(buffer))==0){
00234 sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00235 close_collector(my);
00236 my->status = TS_ERROR;
00237 }
00238 my->last.ts = t0;
00239 }
00240 }
00241
00242
00243 comparison = (COMPAREOP)my->trigger[0];
00244 if (comparison!=NONE)
00245 {
00246 int desired = comparison==LT ? -1 : (comparison==EQ ? 0 : (comparison==GT ? 1 : -2));
00247
00248
00249 int actual = strcmp(buffer,my->trigger+1);
00250 if (actual!=desired || (my->status==TS_INIT && !collector_open(obj))){
00251
00252 return (my->interval==0 || my->interval==-1) ? TS_NEVER : t0+my->interval;
00253 }
00254 }
00255 else if (my->status==TS_INIT && !collector_open(obj)){
00256 close_collector(my);
00257 return TS_NEVER;
00258 }
00259
00260 if(my->last.ts < 1 && my->interval != -1){
00261 my->last.ts = t0;
00262 }
00263
00264
00265 if(my->status == TS_OPEN){
00266 if(my->interval == 0
00267 || ((my->interval == -1) && my->last.ts != t0 && strcmp(buffer, my->last.value) != 0)
00268 ){
00269 strncpy(my->last.value, buffer, sizeof(my->last.value));
00270 my->last.ts = t0;
00271 collector_write(obj);
00272 } else if(my->interval > 0 && my->last.ts == t0){
00273 strncpy(my->last.value, buffer, sizeof(my->last.value));
00274 }
00275 }
00276 Error:
00277 if (my->status==TS_ERROR)
00278 {
00279 gl_error("collector %d %s\n",obj->id, buffer);
00280 my->status=TS_DONE;
00281 return 0;
00282 }
00283
00284 return (my->interval==0 || my->interval==-1) ? TS_NEVER : my->last.ts+my->interval;
00285 }
00286