WSL/SLF GitLab Repository

TimeSeriesManager.cc 13.1 KB
Newer Older
1
/***********************************************************************************/
2
/*  Copyright 2014 WSL Institute for Snow and Avalanche Research    SLF-DAVOS      */
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***********************************************************************************/
/* This file is part of MeteoIO.
    MeteoIO is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    MeteoIO is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with MeteoIO.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <meteoio/TimeSeriesManager.h>

21
22
#include <algorithm>

23
24
25
26
27
28
using namespace std;

namespace mio {

TimeSeriesManager::TimeSeriesManager(IOHandler& in_iohandler, const Config& in_cfg) : cfg(in_cfg), iohandler(in_iohandler),
                                            meteoprocessor(in_cfg), dataGenerator(in_cfg),
29
30
                                            proc_properties(), point_cache(), raw_buffer(), filtered_cache(),
                                            chunk_size(), buff_before(),
31
32
33
34
35
36
37
38
                                            processing_level(IOUtils::filtered | IOUtils::resampled | IOUtils::generated)
{
	meteoprocessor.getWindowSize(proc_properties);
	setDfltBufferProperties();
}

void TimeSeriesManager::setDfltBufferProperties()
{
39
40
	double chunk_size_days = 370.; //default chunk size value
	cfg.getValue("BUFFER_SIZE", "General", chunk_size_days, IOUtils::nothrow); //in days
41
42
43
44
45
46
47
48
49
50
	chunk_size = Duration(chunk_size_days, 0);

	//get buffer centering options
	double buff_centering = -1.;
	double buff_start = -1.;
	cfg.getValue("BUFF_CENTERING", "General", buff_centering, IOUtils::nothrow);
	cfg.getValue("BUFF_BEFORE", "General", buff_start, IOUtils::nothrow);
	if ((buff_centering != -1.) && (buff_start != -1.))
		throw InvalidArgumentException("Please do NOT provide both BUFF_CENTERING and BUFF_BEFORE!!", AT);

51
	if (buff_start != -1.) {
52
53
		buff_before = Duration(buff_start, 0);
	} else {
54
		if (buff_centering != -1.) {
55
56
57
58
59
			if ((buff_centering < 0.) || (buff_centering > 1.))
				throw InvalidArgumentException("BUFF_CENTERING must be between 0 and 1", AT);

			buff_before = chunk_size * buff_centering;
		} else {
60
			buff_before = chunk_size * 0.05; //5% centering by default
61
62
63
64
		}
	}

	//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
65
	if (buff_before>chunk_size) chunk_size = buff_before;
66
67
68
69
70
71
	//BUG: if we do this, we still have the meteo1d window in the way
	//-> we end up not reading enough data and rebuffering...
}

void TimeSeriesManager::setMinBufferRequirements(const double& i_chunk_size, const double& i_buff_before)
{
72
	if (i_buff_before!=IOUtils::nodata) {
73
		const Duration app_buff_before(i_buff_before, 0);
74
		if (app_buff_before>buff_before) buff_before = app_buff_before;
75
	}
76
	if (i_chunk_size!=IOUtils::nodata) {
77
		const Duration app_chunk_size(i_chunk_size, 0);
78
		if (app_chunk_size>chunk_size) chunk_size = app_chunk_size;
79
80
81
	}

	//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
82
	if (buff_before>chunk_size) chunk_size = buff_before;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
}

void TimeSeriesManager::setProcessingLevel(const unsigned int& i_level)
{
	if (i_level >= IOUtils::num_of_levels)
		throw InvalidArgumentException("The processing level is invalid", AT);

	if (((i_level & IOUtils::raw) == IOUtils::raw)
	    && ((i_level & IOUtils::filtered) == IOUtils::filtered))
		throw InvalidArgumentException("The processing level is invalid (raw and filtered at the same time)", AT);

	processing_level = i_level;
}

void TimeSeriesManager::push_meteo_data(const IOUtils::ProcessingLevel& level, const Date& date_start, const Date& date_end,
                                const std::vector< METEO_SET >& vecMeteo)
{
	//perform check on date_start and date_end
	if (date_end < date_start) {
		std::ostringstream ss;
		ss << "Trying to push data set from " << date_start.toString(Date::ISO) << " to " << date_end.toString(Date::ISO) << ". ";
		ss << " Obviously, date_start should be less than date_end!";
		throw InvalidArgumentException(ss.str(), AT);
	}

108
	if (level == IOUtils::filtered) {
109
		filtered_cache.push(date_start, date_end, vecMeteo);
110
	} else if (level == IOUtils::raw) {
111
		filtered_cache.clear();
112
		raw_buffer.push(date_start, date_end, vecMeteo);
113
114
115
116
117
118
119
120
121
122
123
	} else {
		throw InvalidArgumentException("The processing level is invalid (should be raw OR filtered)", AT);
	}

	point_cache.clear(); //clear point cache, so that we don't return resampled values of deprecated data
}

size_t TimeSeriesManager::getStationData(const Date& date, STATIONS_SET& vecStation)
{
	vecStation.clear();

124
	if (processing_level == IOUtils::raw) {
125
126
127
128
129
130
131
132
133
134
135
136
		iohandler.readStationData(date, vecStation);
	} else {
		iohandler.readStationData(date, vecStation);
	}

	return vecStation.size();
}

//for an interval of data: decide whether data should be filtered or raw
size_t TimeSeriesManager::getMeteoData(const Date& dateStart, const Date& dateEnd, std::vector< METEO_SET >& vecVecMeteo)
{
	vecVecMeteo.clear();
137
	if (processing_level == IOUtils::raw) {
138
139
		iohandler.readMeteoData(dateStart, dateEnd, vecVecMeteo);
	} else {
140
		const bool success = filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
141

142
		if (!success) {
143
144
			vector< vector<MeteoData> > tmp_meteo;
			fillRawBuffer(dateStart, dateEnd);
145
			raw_buffer.get(dateStart, dateEnd, tmp_meteo);
146
147

			//now it needs to be secured that the data is actually filtered, if configured
148
			if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
149
				fill_filtered_cache();
150
				filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
151
152
153
154
155
			} else {
				vecVecMeteo = tmp_meteo;
			}
		}

156
		if ((IOUtils::generated & processing_level) == IOUtils::generated)
157
158
159
160
161
162
163
164
165
166
167
168
169
			dataGenerator.fillMissing(vecVecMeteo);
	}

	return vecVecMeteo.size(); //equivalent with the number of stations that have data
}

size_t TimeSeriesManager::getMeteoData(const Date& i_date, METEO_SET& vecMeteo)
{
	vecMeteo.clear();

	//1. Check whether user wants raw data or processed data
	//The first case: we are looking at raw data directly, only unresampled values are considered, exact date match
	if (processing_level == IOUtils::raw) {
170
		std::vector< std::vector<MeteoData> > vec_cache;
171
172
		const Duration eps(1./(24.*3600.), 0.);
		iohandler.readMeteoData(i_date-eps, i_date+eps, vec_cache);
173
		for (size_t ii=0; ii<vec_cache.size(); ii++) { //for every station
174
175
			const size_t index = IOUtils::seek(i_date, vec_cache[ii], true);
			if (index != IOUtils::npos)
176
				vecMeteo.push_back( vec_cache[ii][index] ); //Insert station into vecMeteo
177
178
179
180
181
182
		}
		return vecMeteo.size();
	}

	//2.  Check which data point is available, buffered locally
	const map<Date, vector<MeteoData> >::const_iterator it = point_cache.find(i_date);
183
	if (it != point_cache.end()) {
184
185
186
187
188
189
		vecMeteo = it->second;
		return vecMeteo.size();
	}

	//Let's make sure we have the data we need, in the filtered_cache or in vec_cache
	const Date buffer_start( i_date-proc_properties.time_before ), buffer_end( i_date+proc_properties.time_after );
190
	std::vector< vector<MeteoData> >* data = NULL; //reference to either filtered_cache or raw_buffer
191
	if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
192
		const bool cached = (!filtered_cache.empty()) && (filtered_cache.getBufferStart() <= buffer_start) && (filtered_cache.getBufferEnd() >= buffer_end);
193
194
195
196
197
		if (!cached) {
			//explicit caching, rebuffer if necessary
			fillRawBuffer(buffer_start, buffer_end);
			fill_filtered_cache();
		}
198
		data = &filtered_cache.getBuffer();
199
200
	} else { //data to be resampled should be IOUtils::raw
		fillRawBuffer(buffer_start, buffer_end);
201
		data = &raw_buffer.getBuffer();
202
203
	}

204
	if ((IOUtils::resampled & processing_level) == IOUtils::resampled) { //resampling required
205
		for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
206
			MeteoData md;
207
			const bool success = meteoprocessor.resample(i_date, (*data)[ii], md);
208
			if (success) vecMeteo.push_back( md );
209
		}
210
211
212
213
	} else { //no resampling required
		for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
			const size_t index = IOUtils::seek(i_date, (*data)[ii], true); //needs to be an exact match
			if (index != IOUtils::npos)
214
				vecMeteo.push_back( (*data)[ii][index] ); //Insert station into vecMeteo
215
		}
216
217
	}

218
	if ((IOUtils::generated & processing_level) == IOUtils::generated)
219
220
221
222
223
224
225
226
227
		dataGenerator.fillMissing(vecMeteo);

	add_to_points_cache(i_date, vecMeteo); //Store result in the local cache

	return vecMeteo.size();
}

void TimeSeriesManager::writeMeteoData(const std::vector< METEO_SET >& vecMeteo, const std::string& name)
{
228
	if (processing_level == IOUtils::raw) {
229
230
231
232
233
234
235
236
		iohandler.writeMeteoData(vecMeteo, name);
	} else {
		iohandler.writeMeteoData(vecMeteo, name);
	}
}

double TimeSeriesManager::getAvgSamplingRate() const
{
237
	return raw_buffer.getAvgSamplingRate();
238
239
240
241
242
243
244
}

/**
 * @brief Filter the whole raw meteo data buffer
 */
void TimeSeriesManager::fill_filtered_cache()
{
245
	if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
246
247
		filtered_cache.clear(); //HACK until we get true ringbuffers, to prevent eating up all memory
		meteoprocessor.process(raw_buffer.getBuffer(), filtered_cache.getBuffer());
248
249
		filtered_cache.setBufferStart( raw_buffer.getBufferStart() );
		filtered_cache.setBufferEnd( raw_buffer.getBufferEnd() );
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
	}
}

void TimeSeriesManager::add_to_points_cache(const Date& i_date, const METEO_SET& vecMeteo)
{
	//Check cache size, delete oldest elements if necessary
	if (point_cache.size() > 2000) {
		point_cache.clear(); //HACK: implement a true ring buffer!
	}

	point_cache[i_date] = vecMeteo;
}

void TimeSeriesManager::clear_cache()
{
	raw_buffer.clear();
	filtered_cache.clear();
	point_cache.clear();
}

void TimeSeriesManager::fillRawBuffer(const Date& date_start, const Date& date_end)
{
272
273
	const Date new_start( date_start-buff_before ); //taking centering into account
	const Date new_end( max(new_start + chunk_size, date_end) );
274

275
	raw_buffer.clear(); //HACK until we have a proper ring buffer to avoid eating up all memory...
276
277

	if (raw_buffer.empty()) {
278
		std::vector< METEO_SET > vecMeteo;
279
280
281
		iohandler.readMeteoData(new_start, new_end, vecMeteo);
		raw_buffer.push(new_start, new_end, vecMeteo);
		return;
282
283
	}

284
285
286
	const Date buffer_start( raw_buffer.getBufferStart() );
	const Date buffer_end( raw_buffer.getBufferEnd() );
	if (new_start>buffer_end || new_end<buffer_start) { //easy: full rebuffer
287
		std::vector< METEO_SET > vecMeteo;
288
289
290
291
		iohandler.readMeteoData(new_start, new_end, vecMeteo);
		raw_buffer.push(new_start, new_end, vecMeteo);
		return;
	}
292

293
	if (new_start<buffer_start) { //some data must be inserted before
294
		std::vector< METEO_SET > vecMeteo;
295
296
297
		iohandler.readMeteoData(new_start, buffer_start, vecMeteo);
		raw_buffer.push(new_start, buffer_start, vecMeteo);
	}
298

299
	if (new_end>buffer_end) { //some data must be inserted after. Keep in mind both before and after could happen simultaneously!
300
		std::vector< METEO_SET > vecMeteo;
301
302
		iohandler.readMeteoData(buffer_end, new_end, vecMeteo);
		raw_buffer.push(buffer_end, new_end, vecMeteo);
303
	}
304

305
306
307
308
309
310
311
312
313
314
315
}

const std::string TimeSeriesManager::toString() const {
	ostringstream os;
	os << "<TimeSeriesManager>\n";
	os << "Config& cfg = " << hex << &cfg << dec << "\n";
	os << "IOHandler& iohandler = " << hex << &iohandler << dec << "\n";
	os << meteoprocessor.toString();
	os << "Processing level = " << processing_level << "\n";
	os << dataGenerator.toString();

316
	os << "RawBuffer:\n" << raw_buffer.toString();
317
	os << "Filteredcache:\n" << filtered_cache.toString();
318

319
	//display point_cache
320
321
322
323
324
325
	size_t count=0;
	size_t min_stations=std::numeric_limits<size_t>::max();
	size_t max_stations=0;
	std::map<Date, std::vector<MeteoData> >::const_iterator iter = point_cache.begin();
	for (; iter != point_cache.end(); ++iter) {
		const size_t nb_stations = iter->second.size();
326
327
		if (nb_stations>max_stations) max_stations=nb_stations;
		if (nb_stations<min_stations) min_stations=nb_stations;
328
329
330
		count++;
	}

331
	if (count==0) {
332
333
		os << "Resampled cache is empty\n";
	}
334
	if (count==1) {
335
		os << "Resampled cache content (";
336
		if (max_stations==min_stations)
337
338
339
340
341
342
			os << min_stations;
		else
			os << min_stations << " to " << max_stations;
		os << " station(s))\n";
		os << std::setw(22) << point_cache.begin()->first.toString(Date::ISO) << " - 1 timestep\n";
	}
343
	if (count>1) {
344
345
346
		const double avg_sampling = ( (point_cache.rbegin()->first.getJulian()) - (point_cache.begin()->first.getJulian()) ) / (double)(count-1);

		os << "Resampled cache content (";
347
		if (max_stations==min_stations)
348
349
350
351
352
353
354
355
356
357
358
359
360
361
			os << min_stations;
		else
			os << min_stations << " to " << max_stations;
		os << " station(s))\n";
		os << std::setw(22) << point_cache.begin()->first.toString(Date::ISO);
		os << " - " << point_cache.rbegin()->first.toString(Date::ISO);
		os << " - " << count << " timesteps (" << setprecision(3) << fixed << avg_sampling*24.*3600. << " s sampling rate)";
	}

	os << "</TimeSeriesManager>\n";
	return os.str();
}

} //namespace