00001
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 #include <errno.h>
00026 #include <ctype.h>
00027 #include "gridlabd.h"
00028 #include "object.h"
00029 #include "aggregate.h"
00030
00031 #include "tape.h"
00032 #include "file.h"
00033 #include "odbc.h"
00034
00035 CLASS *collector_class = NULL;
00036 static OBJECT *last_collector = NULL;
00037
00038 EXPORT int create_collector(OBJECT **obj, OBJECT *parent)
00039 {
00040 *obj = gl_create_object(collector_class,sizeof(struct collector));
00041 if (*obj!=NULL)
00042 {
00043 struct collector *my = OBJECTDATA(*obj,struct collector);
00044 last_collector = *obj;
00045 gl_set_parent(*obj,parent);
00046 strcpy(my->file,"");
00047 strcpy(my->filetype,"txt");
00048 strcpy(my->delim,",");
00049 strcpy(my->property,"(undefined)");
00050 strcpy(my->group,"");
00051 my->interval = TS_NEVER;
00052 my->last.ts = -1;
00053 strcpy(my->last.value,"");
00054 my->limit = 0;
00055 my->samples = 0;
00056 my->status = TS_INIT;
00057 my->trigger[0]='\0';
00058 my->format = 0;
00059 my->aggr = NULL;
00060 return 1;
00061 }
00062 return 0;
00063 }
00064
00065 static int collector_open(OBJECT *obj)
00066 {
00067 char32 type="file";
00068 char1024 fname="";
00069 char32 flags="w";
00070 struct collector *my = OBJECTDATA(obj,struct collector);
00071
00072
00073 if (sscanf(my->file,"%32[^:]:%1024[^:]:%[^:]",type,fname,flags)==1)
00074 {
00075
00076 strcpy(fname,my->file);
00077 strcpy(type,"file");
00078 }
00079
00080
00081 if (strcmp(fname,"")==0)
00082 {
00083 char *p;
00084
00085 sprintf(fname,"%s.%s",my->group,my->filetype);
00086
00087
00088 for (p=fname; *p!='\0'; p++)
00089 {
00090 if (!isalnum(*p) && *p!='-' && *p!='.')
00091 *p='_';
00092 }
00093 }
00094
00095
00096 my->ops = get_ftable(type)->collector;
00097 if(my->ops == NULL)
00098 return 0;
00099 return my->ops->open(my, fname, flags);
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 }
00111
00112 static int write_collector(struct collector *my, char *ts, char *value)
00113 {
00114 return my->ops->write(my, ts, value);
00115
00116 switch (my->type) {
00117 case FT_FILE:
00118 return file_write_collector(my,ts,value);
00119 break;
00120 case FT_ODBC:
00121 return odbc_write_collector(my,ts,value);
00122 break;
00123 case FT_MEMORY:
00124 return memory_write_collector(my,ts,value);
00125 break;
00126 default:
00127 return 0;
00128 break;
00129 }
00130 }
00131
00132 static TIMESTAMP collector_write(OBJECT *obj)
00133 {
00134 struct collector *my = OBJECTDATA(obj,struct collector);
00135 char ts[64];
00136 if (my->format==0)
00137 {
00138 time_t t = (time_t)(my->last.ts*TS_SECOND);
00139 strftime(ts,sizeof(ts),timestamp_format, gmtime(&t));
00140 }
00141 else
00142 sprintf(ts,"%" FMT_INT64 "d", my->last.ts);
00143 if ((my->limit>0 && my->samples>my->limit)
00144 || write_collector(my,ts,my->last.value)==0)
00145 {
00146 switch (my->type) {
00147 case FT_FILE:
00148 file_close_collector(my);
00149 break;
00150 case FT_ODBC:
00151 odbc_close_collector(my);
00152 break;
00153 case FT_MEMORY:
00154 memory_close_collector(my);
00155 break;
00156 default:
00157 break;
00158 }
00159 my->status = TS_DONE;
00160 }
00161 else
00162 my->samples++;
00163 return TS_NEVER;
00164 }
00165
00166 AGGREGATION *link_aggregates(char *aggregate_list, char *group)
00167 {
00168 char *item;
00169 AGGREGATION *first=NULL, *last=NULL;
00170 char1024 list;
00171 strcpy(list,aggregate_list);
00172 for (item=strtok(list,","); item!=NULL; item=strtok(NULL,","))
00173 {
00174 AGGREGATION *aggr = gl_create_aggregate(item,group);
00175 if (aggr!=NULL)
00176 {
00177
00178 if (first==NULL) first=aggr; else last->next=aggr;
00179 last=aggr;
00180 aggr->next = NULL;
00181 }
00182 else
00183 return NULL;
00184 }
00185 return first;
00186 }
00187
00188 int read_aggregates(AGGREGATION *aggr, char *buffer, int size)
00189 {
00190 AGGREGATION *p;
00191 int offset=0;
00192 int count=0;
00193 for (p=aggr; p!=NULL && offset<size-33; p=p->next)
00194 {
00195 if (offset>0) strcpy(buffer+offset++,",");
00196 offset+=sprintf(buffer+offset,"%lg",gl_run_aggregate(p));
00197 buffer[offset]='\0';
00198 count++;
00199 }
00200 return count;
00201 }
00202
00203 EXPORT TIMESTAMP sync_collector(OBJECT *obj, TIMESTAMP t0, PASSCONFIG pass)
00204 {
00205 struct collector *my = OBJECTDATA(obj,struct collector);
00206 typedef enum {NONE='\0', LT='<', EQ='=', GT='>'} COMPAREOP;
00207 COMPAREOP comparison;
00208 char1024 buffer = "";
00209
00210 if (my->status==TS_DONE)
00211 return TS_NEVER;
00212
00213
00214 if (my->aggr==NULL)
00215 my->aggr = link_aggregates(my->property,my->group);
00216
00217
00218 if (my->aggr==NULL)
00219 {
00220 sprintf(buffer,"'%s' contains an aggregate that is not found in the group '%s'", my->property, my->group);
00221 my->status = TS_ERROR;
00222 }
00223 else if (read_aggregates(my->aggr,buffer,sizeof(buffer))==0)
00224 {
00225 sprintf(buffer,"unable to read aggregate '%s' of group '%s'", my->property, my->group);
00226 my->status = TS_ERROR;
00227 }
00228
00229
00230 comparison = (COMPAREOP)my->trigger[0];
00231 if (comparison!=NONE)
00232 {
00233 int desired = comparison==LT ? -1 : (comparison==EQ ? 0 : (comparison==GT ? 1 : -2));
00234
00235
00236 int actual = strcmp(buffer,my->trigger+1);
00237 if (actual!=desired || (my->status==TS_INIT && !collector_open(obj)))
00238
00239
00240 return TS_NEVER;
00241 }
00242 else if (my->status==TS_INIT && !collector_open(obj))
00243 return TS_NEVER;
00244
00245
00246 if (my->status==TS_OPEN)
00247 {
00248 if (my->interval==0
00249 || ((my->interval==-1) && my->last.ts!=t0 && strcmp(buffer,my->last.value)!=0)
00250 || my->last.ts+my->interval<=t0)
00251 {
00252 my->last.ts = t0;
00253 strncpy(my->last.value,buffer,sizeof(my->last.value));
00254 collector_write(obj);
00255 }
00256 }
00257 else if (my->status==TS_ERROR)
00258 {
00259 gl_error("collector %d %s\n",obj->id, buffer);
00260 my->status=TS_DONE;
00261 return 0;
00262 }
00263
00264 return (my->interval==0 || my->interval==-1) ? TS_NEVER : t0+my->interval;
00265 }
00266