WSL/SLF GitLab Repository

TimeSeriesManager.cc 13.1 KB
Newer Older
1
/***********************************************************************************/
2
/*  Copyright 2014 WSL Institute for Snow and Avalanche Research    SLF-DAVOS      */
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***********************************************************************************/
/* This file is part of MeteoIO.
    MeteoIO is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    MeteoIO is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with MeteoIO.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <meteoio/TimeSeriesManager.h>

21
22
#include <algorithm>

23
24
25
26
27
28
using namespace std;

namespace mio {

TimeSeriesManager::TimeSeriesManager(IOHandler& in_iohandler, const Config& in_cfg) : cfg(in_cfg), iohandler(in_iohandler),
                                            meteoprocessor(in_cfg), dataGenerator(in_cfg),
29
30
                                            proc_properties(), point_cache(), raw_buffer(), filtered_cache(),
                                            chunk_size(), buff_before(),
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
                                            processing_level(IOUtils::filtered | IOUtils::resampled | IOUtils::generated)
{
	meteoprocessor.getWindowSize(proc_properties);
	setDfltBufferProperties();
}

void TimeSeriesManager::setDfltBufferProperties()
{
	double chunk_size_days = 15.; //default chunk size value
	cfg.getValue("BUFF_CHUNK_SIZE", "General", chunk_size_days, IOUtils::nothrow); //in days
	chunk_size = Duration(chunk_size_days, 0);

	//get buffer centering options
	double buff_centering = -1.;
	double buff_start = -1.;
	cfg.getValue("BUFF_CENTERING", "General", buff_centering, IOUtils::nothrow);
	cfg.getValue("BUFF_BEFORE", "General", buff_start, IOUtils::nothrow);
	if ((buff_centering != -1.) && (buff_start != -1.))
		throw InvalidArgumentException("Please do NOT provide both BUFF_CENTERING and BUFF_BEFORE!!", AT);

	if (buff_start != -1.){
		buff_before = Duration(buff_start, 0);
	} else {
		if (buff_centering != -1.){
			if ((buff_centering < 0.) || (buff_centering > 1.))
				throw InvalidArgumentException("BUFF_CENTERING must be between 0 and 1", AT);

			buff_before = chunk_size * buff_centering;
		} else {
			buff_before = chunk_size * 0.1; //10% centering by default
		}
	}

	//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
65
	if (buff_before>chunk_size) chunk_size = buff_before;
66
67
68
69
70
71
	//BUG: if we do this, we still have the meteo1d window in the way
	//-> we end up not reading enough data and rebuffering...
}

void TimeSeriesManager::setMinBufferRequirements(const double& i_chunk_size, const double& i_buff_before)
{
72
	if (i_buff_before!=IOUtils::nodata) {
73
		const Duration app_buff_before(i_buff_before, 0);
74
		if (app_buff_before>buff_before) buff_before = app_buff_before;
75
	}
76
	if (i_chunk_size!=IOUtils::nodata) {
77
		const Duration app_chunk_size(i_chunk_size, 0);
78
		if (app_chunk_size>chunk_size) chunk_size = app_chunk_size;
79
80
81
	}

	//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
82
	if (buff_before>chunk_size) chunk_size = buff_before;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
}

void TimeSeriesManager::setProcessingLevel(const unsigned int& i_level)
{
	if (i_level >= IOUtils::num_of_levels)
		throw InvalidArgumentException("The processing level is invalid", AT);

	if (((i_level & IOUtils::raw) == IOUtils::raw)
	    && ((i_level & IOUtils::filtered) == IOUtils::filtered))
		throw InvalidArgumentException("The processing level is invalid (raw and filtered at the same time)", AT);

	processing_level = i_level;
}

void TimeSeriesManager::push_meteo_data(const IOUtils::ProcessingLevel& level, const Date& date_start, const Date& date_end,
                                const std::vector< METEO_SET >& vecMeteo)
{
	//perform check on date_start and date_end
	if (date_end < date_start) {
		std::ostringstream ss;
		ss << "Trying to push data set from " << date_start.toString(Date::ISO) << " to " << date_end.toString(Date::ISO) << ". ";
		ss << " Obviously, date_start should be less than date_end!";
		throw InvalidArgumentException(ss.str(), AT);
	}

108
	if (level == IOUtils::filtered) {
109
		filtered_cache.push(date_start, date_end, vecMeteo);
110
	} else if (level == IOUtils::raw) {
111
		filtered_cache.clear();
112
		raw_buffer.push(date_start, date_end, vecMeteo);
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
	} else {
		throw InvalidArgumentException("The processing level is invalid (should be raw OR filtered)", AT);
	}

	point_cache.clear(); //clear point cache, so that we don't return resampled values of deprecated data
}

size_t TimeSeriesManager::getStationData(const Date& date, STATIONS_SET& vecStation)
{
	vecStation.clear();

	if (processing_level == IOUtils::raw){
		iohandler.readStationData(date, vecStation);
	} else {
		iohandler.readStationData(date, vecStation);
	}

	return vecStation.size();
}

//for an interval of data: decide whether data should be filtered or raw
size_t TimeSeriesManager::getMeteoData(const Date& dateStart, const Date& dateEnd, std::vector< METEO_SET >& vecVecMeteo)
{
	vecVecMeteo.clear();
	if (processing_level == IOUtils::raw){
		iohandler.readMeteoData(dateStart, dateEnd, vecVecMeteo);
	} else {
140
		const bool success = filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
141
142
143
144

		if (!success){
			vector< vector<MeteoData> > tmp_meteo;
			fillRawBuffer(dateStart, dateEnd);
145
			raw_buffer.get(dateStart, dateEnd, tmp_meteo);
146
147
148
149

			//now it needs to be secured that the data is actually filtered, if configured
			if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
				fill_filtered_cache();
150
				filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
			} else {
				vecVecMeteo = tmp_meteo;
			}
		}

		if ((IOUtils::generated & processing_level) == IOUtils::generated){
			dataGenerator.createParameters(vecVecMeteo);
			dataGenerator.fillMissing(vecVecMeteo);
		}
	}

	return vecVecMeteo.size(); //equivalent with the number of stations that have data
}

size_t TimeSeriesManager::getMeteoData(const Date& i_date, METEO_SET& vecMeteo)
{
	vecMeteo.clear();

	//1. Check whether user wants raw data or processed data
	//The first case: we are looking at raw data directly, only unresampled values are considered, exact date match
	if (processing_level == IOUtils::raw) {
172
173
174
		vector< vector<MeteoData> > vec_cache;
		const Duration eps(1./(24.*3600.), 0.);
		iohandler.readMeteoData(i_date-eps, i_date+eps, vec_cache);
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
		for (size_t ii=0; ii<vec_cache.size(); ii++){ //for every station
			const size_t index = IOUtils::seek(i_date, vec_cache[ii], true);
			if (index != IOUtils::npos)
				vecMeteo.push_back(vec_cache[ii][index]); //Insert station into vecMeteo
		}
		return vecMeteo.size();
	}

	//2.  Check which data point is available, buffered locally
	const map<Date, vector<MeteoData> >::const_iterator it = point_cache.find(i_date);
	if (it != point_cache.end()){
		vecMeteo = it->second;
		return vecMeteo.size();
	}

	//Let's make sure we have the data we need, in the filtered_cache or in vec_cache
	const Date buffer_start( i_date-proc_properties.time_before ), buffer_end( i_date+proc_properties.time_after );
192
	vector< vector<MeteoData> >* data = NULL; //reference to either filtered_cache or raw_buffer
193
	if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
194
		const bool cached = (!filtered_cache.empty()) && (filtered_cache.getBufferStart() <= buffer_start) && (filtered_cache.getBufferEnd() >= buffer_end);
195
196
197
198
199
		if (!cached) {
			//explicit caching, rebuffer if necessary
			fillRawBuffer(buffer_start, buffer_end);
			fill_filtered_cache();
		}
200
		data = &filtered_cache.getBuffer();
201
202
	} else { //data to be resampled should be IOUtils::raw
		fillRawBuffer(buffer_start, buffer_end);
203
		data = &raw_buffer.getBuffer();
204
205
	}

206
	if ((IOUtils::resampled & processing_level) == IOUtils::resampled) { //resampling required
207
		for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
208
			MeteoData md;
209
210
211
			const bool success = meteoprocessor.resample(i_date, (*data)[ii], md);
			if (success) vecMeteo.push_back(md);
		}
212
213
214
215
216
217
	} else { //no resampling required
		for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
			const size_t index = IOUtils::seek(i_date, (*data)[ii], true); //needs to be an exact match
			if (index != IOUtils::npos)
				vecMeteo.push_back((*data)[ii][index]); //Insert station into vecMeteo
		}
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
	}

	if ((IOUtils::generated & processing_level) == IOUtils::generated) {
		dataGenerator.createParameters(vecMeteo);
		dataGenerator.fillMissing(vecMeteo);
	}

	add_to_points_cache(i_date, vecMeteo); //Store result in the local cache

	return vecMeteo.size();
}

void TimeSeriesManager::writeMeteoData(const std::vector< METEO_SET >& vecMeteo, const std::string& name)
{
	if (processing_level == IOUtils::raw){
		iohandler.writeMeteoData(vecMeteo, name);
	} else {
		iohandler.writeMeteoData(vecMeteo, name);
	}
}

double TimeSeriesManager::getAvgSamplingRate() const
{
241
	return raw_buffer.getAvgSamplingRate();
242
243
244
245
246
247
248
249
}

/**
 * @brief Filter the whole raw meteo data buffer
 */
void TimeSeriesManager::fill_filtered_cache()
{
	if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
250
251
252
253
		filtered_cache.clear(); //HACK until we get true ringbuffers, to prevent eating up all memory
		meteoprocessor.process(raw_buffer.getBuffer(), filtered_cache.getBuffer());
		filtered_cache.setBufferStart(raw_buffer.getBufferStart()); //HACK
		filtered_cache.setBufferEnd(raw_buffer.getBufferEnd()); //HACK
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
	}
}

void TimeSeriesManager::add_to_points_cache(const Date& i_date, const METEO_SET& vecMeteo)
{
	//Check cache size, delete oldest elements if necessary
	if (point_cache.size() > 2000) {
		point_cache.clear(); //HACK: implement a true ring buffer!
	}

	point_cache[i_date] = vecMeteo;
}

void TimeSeriesManager::clear_cache()
{
	raw_buffer.clear();
	filtered_cache.clear();
	point_cache.clear();
}

void TimeSeriesManager::fillRawBuffer(const Date& date_start, const Date& date_end)
{
276
277
	const Date new_start( date_start-buff_before ); //taking centering into account
	const Date new_end( max(new_start + chunk_size, date_end) );
278

279
	raw_buffer.clear(); //HACK until we have a proper ring buffer to avoid eating up all memory...
280
281

	if (raw_buffer.empty()) {
282
283
284
285
		vector< METEO_SET > vecMeteo;
		iohandler.readMeteoData(new_start, new_end, vecMeteo);
		raw_buffer.push(new_start, new_end, vecMeteo);
		return;
286
287
	}

288
289
290
291
292
293
294
295
	const Date buffer_start( raw_buffer.getBufferStart() );
	const Date buffer_end( raw_buffer.getBufferEnd() );
	if (new_start>buffer_end || new_end<buffer_start) { //easy: full rebuffer
		vector< METEO_SET > vecMeteo;
		iohandler.readMeteoData(new_start, new_end, vecMeteo);
		raw_buffer.push(new_start, new_end, vecMeteo);
		return;
	}
296

297
298
299
300
301
	if (new_start<buffer_start) { //some data must be inserted before
		vector< METEO_SET > vecMeteo;
		iohandler.readMeteoData(new_start, buffer_start, vecMeteo);
		raw_buffer.push(new_start, buffer_start, vecMeteo);
	}
302

303
304
305
306
	if (new_end>buffer_end) { //some data must be inserted after. Keep in mind both before and after could happen simultaneously!
		vector< METEO_SET > vecMeteo;
		iohandler.readMeteoData(buffer_end, new_end, vecMeteo);
		raw_buffer.push(buffer_end, new_end, vecMeteo);
307
	}
308

309
310
311
312
313
314
315
316
317
318
319
}

const std::string TimeSeriesManager::toString() const {
	ostringstream os;
	os << "<TimeSeriesManager>\n";
	os << "Config& cfg = " << hex << &cfg << dec << "\n";
	os << "IOHandler& iohandler = " << hex << &iohandler << dec << "\n";
	os << meteoprocessor.toString();
	os << "Processing level = " << processing_level << "\n";
	os << dataGenerator.toString();

320
	os << "RawBuffer:\n" << raw_buffer.toString();
321
	os << "Filteredcache:\n" << filtered_cache.toString();
322

323
	//display point_cache
324
325
326
327
328
329
	size_t count=0;
	size_t min_stations=std::numeric_limits<size_t>::max();
	size_t max_stations=0;
	std::map<Date, std::vector<MeteoData> >::const_iterator iter = point_cache.begin();
	for (; iter != point_cache.end(); ++iter) {
		const size_t nb_stations = iter->second.size();
330
331
		if (nb_stations>max_stations) max_stations=nb_stations;
		if (nb_stations<min_stations) min_stations=nb_stations;
332
333
334
		count++;
	}

335
	if (count==0) {
336
337
		os << "Resampled cache is empty\n";
	}
338
	if (count==1) {
339
		os << "Resampled cache content (";
340
		if (max_stations==min_stations)
341
342
343
344
345
346
			os << min_stations;
		else
			os << min_stations << " to " << max_stations;
		os << " station(s))\n";
		os << std::setw(22) << point_cache.begin()->first.toString(Date::ISO) << " - 1 timestep\n";
	}
347
	if (count>1) {
348
349
350
		const double avg_sampling = ( (point_cache.rbegin()->first.getJulian()) - (point_cache.begin()->first.getJulian()) ) / (double)(count-1);

		os << "Resampled cache content (";
351
		if (max_stations==min_stations)
352
353
354
355
356
357
358
359
360
361
362
363
364
365
			os << min_stations;
		else
			os << min_stations << " to " << max_stations;
		os << " station(s))\n";
		os << std::setw(22) << point_cache.begin()->first.toString(Date::ISO);
		os << " - " << point_cache.rbegin()->first.toString(Date::ISO);
		os << " - " << count << " timesteps (" << setprecision(3) << fixed << avg_sampling*24.*3600. << " s sampling rate)";
	}

	os << "</TimeSeriesManager>\n";
	return os.str();
}

} //namespace