/***********************************************************************************/
/* Copyright 2014 WSL Institute for Snow and Avalanche Research SLF-DAVOS */
/***********************************************************************************/
/* This file is part of MeteoIO.
MeteoIO is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MeteoIO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with MeteoIO. If not, see .
*/
#include
#include
using namespace std;
namespace mio {
TimeSeriesManager::TimeSeriesManager(IOHandler& in_iohandler, const Config& in_cfg) : cfg(in_cfg), iohandler(in_iohandler),
meteoprocessor(in_cfg), dataGenerator(in_cfg),
proc_properties(), point_cache(), raw_buffer(), filtered_cache(),
chunk_size(), buff_before(),
processing_level(IOUtils::filtered | IOUtils::resampled | IOUtils::generated)
{
meteoprocessor.getWindowSize(proc_properties);
setDfltBufferProperties();
}
void TimeSeriesManager::setDfltBufferProperties()
{
double chunk_size_days = 370.; //default chunk size value
cfg.getValue("BUFFER_SIZE", "General", chunk_size_days, IOUtils::nothrow); //in days
chunk_size = Duration(chunk_size_days, 0);
//get buffer centering options
double buff_centering = -1.;
double buff_start = -1.;
cfg.getValue("BUFF_CENTERING", "General", buff_centering, IOUtils::nothrow);
cfg.getValue("BUFF_BEFORE", "General", buff_start, IOUtils::nothrow);
if ((buff_centering != -1.) && (buff_start != -1.))
throw InvalidArgumentException("Please do NOT provide both BUFF_CENTERING and BUFF_BEFORE!!", AT);
if (buff_start != -1.) {
buff_before = Duration(buff_start, 0);
} else {
if (buff_centering != -1.) {
if ((buff_centering < 0.) || (buff_centering > 1.))
throw InvalidArgumentException("BUFF_CENTERING must be between 0 and 1", AT);
buff_before = chunk_size * buff_centering;
} else {
buff_before = chunk_size * 0.05; //5% centering by default
}
}
//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
if (buff_before>chunk_size) chunk_size = buff_before;
//BUG: if we do this, we still have the meteo1d window in the way
//-> we end up not reading enough data and rebuffering...
}
void TimeSeriesManager::setMinBufferRequirements(const double& i_chunk_size, const double& i_buff_before)
{
if (i_buff_before!=IOUtils::nodata) {
const Duration app_buff_before(i_buff_before, 0);
if (app_buff_before>buff_before) buff_before = app_buff_before;
}
if (i_chunk_size!=IOUtils::nodata) {
const Duration app_chunk_size(i_chunk_size, 0);
if (app_chunk_size>chunk_size) chunk_size = app_chunk_size;
}
//if buff_before>chunk_size, we will have a problem (ie: we won't ever read the whole data we need)
if (buff_before>chunk_size) chunk_size = buff_before;
}
void TimeSeriesManager::setProcessingLevel(const unsigned int& i_level)
{
if (i_level >= IOUtils::num_of_levels)
throw InvalidArgumentException("The processing level is invalid", AT);
if (((i_level & IOUtils::raw) == IOUtils::raw)
&& ((i_level & IOUtils::filtered) == IOUtils::filtered))
throw InvalidArgumentException("The processing level is invalid (raw and filtered at the same time)", AT);
processing_level = i_level;
}
void TimeSeriesManager::push_meteo_data(const IOUtils::ProcessingLevel& level, const Date& date_start, const Date& date_end,
const std::vector< METEO_SET >& vecMeteo)
{
//perform check on date_start and date_end
if (date_end < date_start) {
std::ostringstream ss;
ss << "Trying to push data set from " << date_start.toString(Date::ISO) << " to " << date_end.toString(Date::ISO) << ". ";
ss << " Obviously, date_start should be less than date_end!";
throw InvalidArgumentException(ss.str(), AT);
}
if (level == IOUtils::filtered) {
filtered_cache.push(date_start, date_end, vecMeteo);
} else if (level == IOUtils::raw) {
filtered_cache.clear();
raw_buffer.push(date_start, date_end, vecMeteo);
} else {
throw InvalidArgumentException("The processing level is invalid (should be raw OR filtered)", AT);
}
point_cache.clear(); //clear point cache, so that we don't return resampled values of deprecated data
}
size_t TimeSeriesManager::getStationData(const Date& date, STATIONS_SET& vecStation)
{
vecStation.clear();
if (processing_level == IOUtils::raw) {
iohandler.readStationData(date, vecStation);
} else {
iohandler.readStationData(date, vecStation);
}
return vecStation.size();
}
//for an interval of data: decide whether data should be filtered or raw
size_t TimeSeriesManager::getMeteoData(const Date& dateStart, const Date& dateEnd, std::vector< METEO_SET >& vecVecMeteo)
{
vecVecMeteo.clear();
if (processing_level == IOUtils::raw) {
iohandler.readMeteoData(dateStart, dateEnd, vecVecMeteo);
} else {
const bool success = filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
if (!success) {
vector< vector > tmp_meteo;
fillRawBuffer(dateStart, dateEnd);
raw_buffer.get(dateStart, dateEnd, tmp_meteo);
//now it needs to be secured that the data is actually filtered, if configured
if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
fill_filtered_cache();
filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
} else {
vecVecMeteo = tmp_meteo;
}
}
if ((IOUtils::generated & processing_level) == IOUtils::generated)
dataGenerator.fillMissing(vecVecMeteo);
}
return vecVecMeteo.size(); //equivalent with the number of stations that have data
}
size_t TimeSeriesManager::getMeteoData(const Date& i_date, METEO_SET& vecMeteo)
{
vecMeteo.clear();
//1. Check whether user wants raw data or processed data
//The first case: we are looking at raw data directly, only unresampled values are considered, exact date match
if (processing_level == IOUtils::raw) {
std::vector< std::vector > vec_cache;
const Duration eps(1./(24.*3600.), 0.);
iohandler.readMeteoData(i_date-eps, i_date+eps, vec_cache);
for (size_t ii=0; ii >::const_iterator it = point_cache.find(i_date);
if (it != point_cache.end()) {
vecMeteo = it->second;
return vecMeteo.size();
}
//Let's make sure we have the data we need, in the filtered_cache or in vec_cache
const Date buffer_start( i_date-proc_properties.time_before ), buffer_end( i_date+proc_properties.time_after );
std::vector< vector >* data = NULL; //reference to either filtered_cache or raw_buffer
if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
const bool cached = (!filtered_cache.empty()) && (filtered_cache.getBufferStart() <= buffer_start) && (filtered_cache.getBufferEnd() >= buffer_end);
if (!cached) {
//explicit caching, rebuffer if necessary
fillRawBuffer(buffer_start, buffer_end);
fill_filtered_cache();
}
data = &filtered_cache.getBuffer();
} else { //data to be resampled should be IOUtils::raw
fillRawBuffer(buffer_start, buffer_end);
data = &raw_buffer.getBuffer();
}
if ((IOUtils::resampled & processing_level) == IOUtils::resampled) { //resampling required
for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
MeteoData md;
const bool success = meteoprocessor.resample(i_date, (*data)[ii], md);
if (success) vecMeteo.push_back( md );
}
} else { //no resampling required
for (size_t ii=0; ii<(*data).size(); ii++) { //for every station
const size_t index = IOUtils::seek(i_date, (*data)[ii], true); //needs to be an exact match
if (index != IOUtils::npos)
vecMeteo.push_back( (*data)[ii][index] ); //Insert station into vecMeteo
}
}
if ((IOUtils::generated & processing_level) == IOUtils::generated)
dataGenerator.fillMissing(vecMeteo);
add_to_points_cache(i_date, vecMeteo); //Store result in the local cache
return vecMeteo.size();
}
void TimeSeriesManager::writeMeteoData(const std::vector< METEO_SET >& vecMeteo, const std::string& name)
{
if (processing_level == IOUtils::raw) {
iohandler.writeMeteoData(vecMeteo, name);
} else {
iohandler.writeMeteoData(vecMeteo, name);
}
}
double TimeSeriesManager::getAvgSamplingRate() const
{
return raw_buffer.getAvgSamplingRate();
}
/**
* @brief Filter the whole raw meteo data buffer
*/
void TimeSeriesManager::fill_filtered_cache()
{
if ((IOUtils::filtered & processing_level) == IOUtils::filtered) {
filtered_cache.clear(); //HACK until we get true ringbuffers, to prevent eating up all memory
meteoprocessor.process(raw_buffer.getBuffer(), filtered_cache.getBuffer());
filtered_cache.setBufferStart( raw_buffer.getBufferStart() );
filtered_cache.setBufferEnd( raw_buffer.getBufferEnd() );
}
}
void TimeSeriesManager::add_to_points_cache(const Date& i_date, const METEO_SET& vecMeteo)
{
//Check cache size, delete oldest elements if necessary
if (point_cache.size() > 2000) {
point_cache.clear(); //HACK: implement a true ring buffer!
}
point_cache[i_date] = vecMeteo;
}
void TimeSeriesManager::clear_cache()
{
raw_buffer.clear();
filtered_cache.clear();
point_cache.clear();
}
void TimeSeriesManager::fillRawBuffer(const Date& date_start, const Date& date_end)
{
const Date new_start( date_start-buff_before ); //taking centering into account
const Date new_end( max(new_start + chunk_size, date_end) );
raw_buffer.clear(); //HACK until we have a proper ring buffer to avoid eating up all memory...
if (raw_buffer.empty()) {
std::vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(new_start, new_end, vecMeteo);
raw_buffer.push(new_start, new_end, vecMeteo);
return;
}
const Date buffer_start( raw_buffer.getBufferStart() );
const Date buffer_end( raw_buffer.getBufferEnd() );
if (new_start>buffer_end || new_end vecMeteo;
iohandler.readMeteoData(new_start, new_end, vecMeteo);
raw_buffer.push(new_start, new_end, vecMeteo);
return;
}
if (new_start vecMeteo;
iohandler.readMeteoData(new_start, buffer_start, vecMeteo);
raw_buffer.push(new_start, buffer_start, vecMeteo);
}
if (new_end>buffer_end) { //some data must be inserted after. Keep in mind both before and after could happen simultaneously!
std::vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(buffer_end, new_end, vecMeteo);
raw_buffer.push(buffer_end, new_end, vecMeteo);
}
}
const std::string TimeSeriesManager::toString() const {
ostringstream os;
os << "\n";
os << "Config& cfg = " << hex << &cfg << dec << "\n";
os << "IOHandler& iohandler = " << hex << &iohandler << dec << "\n";
os << meteoprocessor.toString();
os << "Processing level = " << processing_level << "\n";
os << dataGenerator.toString();
os << "RawBuffer:\n" << raw_buffer.toString();
os << "Filteredcache:\n" << filtered_cache.toString();
//display point_cache
size_t count=0;
size_t min_stations=std::numeric_limits::max();
size_t max_stations=0;
std::map >::const_iterator iter = point_cache.begin();
for (; iter != point_cache.end(); ++iter) {
const size_t nb_stations = iter->second.size();
if (nb_stations>max_stations) max_stations=nb_stations;
if (nb_stationsfirst.toString(Date::ISO) << " - 1 timestep\n";
}
if (count>1) {
const double avg_sampling = ( (point_cache.rbegin()->first.getJulian()) - (point_cache.begin()->first.getJulian()) ) / (double)(count-1);
os << "Resampled cache content (";
if (max_stations==min_stations)
os << min_stations;
else
os << min_stations << " to " << max_stations;
os << " station(s))\n";
os << std::setw(22) << point_cache.begin()->first.toString(Date::ISO);
os << " - " << point_cache.rbegin()->first.toString(Date::ISO);
os << " - " << count << " timesteps (" << setprecision(3) << fixed << avg_sampling*24.*3600. << " s sampling rate)";
}
os << "\n";
return os.str();
}
} //namespace