WSL/SLF GitLab Repository

Commit 6f1cd6e9 authored by Mathias Bavay's avatar Mathias Bavay
Browse files

The Buffer class is now used to buffer meteo time series. Since this is not...

The Buffer class is now used to buffer meteo time series. Since this is not (yet) a proper ringbuffer, all data is erased before pushing more data into a buffer (similarly to what it was before this commit). It has been tested with data_converter on numerous data (including with virtual stations) as well as with snowpack on operational stations.
parent 0af51718
......@@ -18,15 +18,16 @@
#include <meteoio/TimeSeriesManager.h>
#include <algorithm>
using namespace std;
namespace mio {
TimeSeriesManager::TimeSeriesManager(IOHandler& in_iohandler, const Config& in_cfg) : cfg(in_cfg), iohandler(in_iohandler),
meteoprocessor(in_cfg), dataGenerator(in_cfg),
proc_properties(), point_cache(), filtered_cache(), raw_buffer(),
fcache_start(Date(0.0, 0.)), fcache_end(Date(0.0, 0.)), //this should not matter, since 0 is still way back before any real data...
raw_start(Date(0.0, 0.)), raw_end(Date(0.0, 0.)), chunk_size(), buff_before(),
proc_properties(), point_cache(), raw_buffer(), filtered_cache(),
chunk_size(), buff_before(),
processing_level(IOUtils::filtered | IOUtils::resampled | IOUtils::generated)
{
meteoprocessor.getWindowSize(proc_properties);
......@@ -105,15 +106,10 @@ void TimeSeriesManager::push_meteo_data(const IOUtils::ProcessingLevel& level, c
}
if (level == IOUtils::filtered) {
fcache_start = date_start;
fcache_end = date_end;
filtered_cache = vecMeteo;
filtered_cache.push(date_start, date_end, vecMeteo);
} else if (level == IOUtils::raw) {
fcache_start = fcache_end = Date(0.0, 0.);
filtered_cache.clear();
raw_start = date_start;
raw_end = date_end;
raw_buffer = vecMeteo;
raw_buffer.push(date_start, date_end, vecMeteo);
} else {
throw InvalidArgumentException("The processing level is invalid (should be raw OR filtered)", AT);
}
......@@ -138,25 +134,20 @@ size_t TimeSeriesManager::getStationData(const Date& date, STATIONS_SET& vecStat
size_t TimeSeriesManager::getMeteoData(const Date& dateStart, const Date& dateEnd, std::vector< METEO_SET >& vecVecMeteo)
{
vecVecMeteo.clear();
if (processing_level == IOUtils::raw){
iohandler.readMeteoData(dateStart, dateEnd, vecVecMeteo);
} else {
const bool success = read_filtered_cache(dateStart, dateEnd, vecVecMeteo);
const bool success = filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
if (!success){
vector< vector<MeteoData> > tmp_meteo;
fillRawBuffer(dateStart, dateEnd);
getFromRawBuffer(dateStart, dateEnd, tmp_meteo);
raw_buffer.get(dateStart, dateEnd, tmp_meteo);
//now it needs to be secured that the data is actually filtered, if configured
if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
//we don't use tmp_meteo, but calling fillRawBuffer has filled the buffer for us
//and fill_filtered_cache will directly use raw_buffer
//HACK: if raw_buffer can not hold all data between start and end
//then this would not work
fill_filtered_cache();
read_filtered_cache(dateStart, dateEnd, vecVecMeteo);
filtered_cache.get(dateStart, dateEnd, vecVecMeteo);
} else {
vecVecMeteo = tmp_meteo;
}
......@@ -174,12 +165,13 @@ size_t TimeSeriesManager::getMeteoData(const Date& dateStart, const Date& dateEn
size_t TimeSeriesManager::getMeteoData(const Date& i_date, METEO_SET& vecMeteo)
{
vecMeteo.clear();
vector< vector<MeteoData> > vec_cache;
//1. Check whether user wants raw data or processed data
//The first case: we are looking at raw data directly, only unresampled values are considered, exact date match
if (processing_level == IOUtils::raw) {
iohandler.readMeteoData(i_date-Duration(1./(24.*3600.), 0.), i_date+Duration(1./(24.*3600.), 0.), vec_cache);
vector< vector<MeteoData> > vec_cache;
const Duration eps(1./(24.*3600.), 0.);
iohandler.readMeteoData(i_date-eps, i_date+eps, vec_cache);
for (size_t ii=0; ii<vec_cache.size(); ii++){ //for every station
const size_t index = IOUtils::seek(i_date, vec_cache[ii], true);
if (index != IOUtils::npos)
......@@ -197,19 +189,18 @@ size_t TimeSeriesManager::getMeteoData(const Date& i_date, METEO_SET& vecMeteo)
//Let's make sure we have the data we need, in the filtered_cache or in vec_cache
const Date buffer_start( i_date-proc_properties.time_before ), buffer_end( i_date+proc_properties.time_after );
vector< vector<MeteoData> >* data = NULL; //reference to either filtered_cache or vec_cache
vector< vector<MeteoData> >* data = NULL; //reference to either filtered_cache or raw_buffer
if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
const bool cached = (fcache_start <= buffer_start) && (fcache_end >= buffer_end);
const bool cached = (!filtered_cache.empty()) && (filtered_cache.getBufferStart() <= buffer_start) && (filtered_cache.getBufferEnd() >= buffer_end);
if (!cached) {
//explicit caching, rebuffer if necessary
fillRawBuffer(buffer_start, buffer_end);
fill_filtered_cache();
}
data = &filtered_cache;
data = &filtered_cache.getBuffer();
} else { //data to be resampled should be IOUtils::raw
fillRawBuffer(buffer_start, buffer_end);
getFromRawBuffer(buffer_start, buffer_end, vec_cache);
data = &vec_cache;
data = &raw_buffer.getBuffer();
}
if ((IOUtils::resampled & processing_level) != IOUtils::resampled) { //no resampling required
......@@ -247,26 +238,7 @@ void TimeSeriesManager::writeMeteoData(const std::vector< METEO_SET >& vecMeteo,
double TimeSeriesManager::getAvgSamplingRate() const
{
if (processing_level == IOUtils::raw || raw_buffer.empty())
return IOUtils::nodata;
const size_t nr_stations = raw_buffer.size();
double sum = 0;
for (size_t ii=0; ii<nr_stations; ii++){ //loop over all stations
if(!raw_buffer[ii].empty()) {
const std::vector<MeteoData>& curr_station = raw_buffer[ii];
const double days = curr_station.back().date.getJulian() - curr_station.front().date.getJulian();
//add the average sampling rate for this station
const size_t nr_data_pts = raw_buffer[ii].size();
if(days>0.) sum += (double)(nr_data_pts-1) / days; //the interval story: 2 points define 1 interval!
}
}
if (sum > 0.){
return ((double)sum / (double)(nr_stations*24*3600)); //in points per seconds, ie Hz
}
return IOUtils::nodata;
return raw_buffer.getAvgSamplingRate();
}
/**
......@@ -275,52 +247,11 @@ double TimeSeriesManager::getAvgSamplingRate() const
void TimeSeriesManager::fill_filtered_cache()
{
if ((IOUtils::filtered & processing_level) == IOUtils::filtered){
//ask the bufferediohandler for the whole buffer
fcache_start = raw_start;
fcache_end = raw_end;
const vector< METEO_SET >& buffer( raw_buffer );
meteoprocessor.process(buffer, filtered_cache);
}
}
/**
* @brief Try to cut out a chunk of the time series stored in filtered_cache
* @param start_date The start date of the chunk to be cut out (inclusive)
* @param start_date The end date of the chunk to be cut out (inclusive)
* @param vec_meteo A vector to store the chunk cut out
* @return true if the requested chunk was contained by filtered_cache, false otherwise
*/
bool TimeSeriesManager::read_filtered_cache(const Date& start_date, const Date& end_date, std::vector< METEO_SET >& vec_meteo)
{
if ((start_date >= fcache_start) && (end_date <= fcache_end)){
//it's already in the filtered_cache, so just copy the requested slice
for (size_t ii=0; ii<filtered_cache.size(); ii++){ //loop over stations
size_t startpos = IOUtils::seek(start_date, filtered_cache[ii], false);
if (startpos == IOUtils::npos){
if (!filtered_cache[ii].empty()){
if (filtered_cache[ii][0].date <= end_date){
startpos = 0;
}
}
}
if (startpos != IOUtils::npos){
vec_meteo.push_back(vector<MeteoData>());
for (size_t jj=startpos; jj<filtered_cache[ii].size(); jj++){
const MeteoData& md = filtered_cache[ii][jj];
if (md.date <= end_date){
vec_meteo.back().push_back(md);
} else {
break;
}
}
}
}
return true;
filtered_cache.clear(); //HACK until we get true ringbuffers, to prevent eating up all memory
meteoprocessor.process(raw_buffer.getBuffer(), filtered_cache.getBuffer());
filtered_cache.setBufferStart(raw_buffer.getBufferStart()); //HACK
filtered_cache.setBufferEnd(raw_buffer.getBufferEnd()); //HACK
}
return false;
}
void TimeSeriesManager::add_to_points_cache(const Date& i_date, const METEO_SET& vecMeteo)
......@@ -336,105 +267,45 @@ void TimeSeriesManager::add_to_points_cache(const Date& i_date, const METEO_SET&
void TimeSeriesManager::clear_cache()
{
raw_buffer.clear();
raw_start = Date(0., 0.);
raw_end = Date(0., 0.);
filtered_cache.clear();
fcache_start = Date(0., 0.);
fcache_end = Date(0., 0.);
point_cache.clear();
}
/**
* @brief return all the buffered data between the given dates
* @param date_start requested start date of the buffer
* @param date_end requested end date of the buffer
* @param data vector to fill with the buffered data
*/
void TimeSeriesManager::getFromRawBuffer(const Date& date_start, const Date& date_end, std::vector< METEO_SET > &vecMeteo)
{
//1. Prepare the output vector
const size_t buffer_size = raw_buffer.size();
vecMeteo.clear();
vecMeteo.reserve(buffer_size);
//2. Copy appropriate data into vecMeteo for each station
for (size_t ii=0; ii<buffer_size; ii++){ //loop through stations
vecMeteo.push_back(vector<MeteoData>()); //insert one empty vector of MeteoData
if (raw_buffer[ii].empty()) continue; //no data in buffer for this station
size_t pos_start = IOUtils::seek(date_start, raw_buffer[ii], false);
if (pos_start == IOUtils::npos) pos_start = 0;
size_t pos_end = IOUtils::seek(date_end, raw_buffer[ii], false);//HACK:: edit IOUtils::seek to accept an offset
if (pos_end == IOUtils::npos) pos_end = raw_buffer[ii].size() - 1; //just copy until the end of the buffer
if (raw_buffer[ii][pos_end].date > date_end){
if (pos_end > pos_start) pos_end--;
} else {
pos_end++;
}
vecMeteo[ii].reserve(pos_end-pos_start+1); //weird that the "insert" does not handle it internally...
vecMeteo[ii].insert(vecMeteo[ii].begin(), raw_buffer[ii].begin()+pos_start, raw_buffer[ii].begin()+pos_end);
}
}
void TimeSeriesManager::fillRawBuffer(const Date& date_start, const Date& date_end)
{
const Date new_buffer_start(date_start-buff_before); //taking centering into account
Date new_buffer_end(new_buffer_start + chunk_size);
const Date new_start( date_start-buff_before ); //taking centering into account
const Date new_end( max(new_start + chunk_size, date_end) );
//Read MeteoData for requested interval in chunks, furthermore buffer it
//Try to buffer after the requested chunk for subsequent calls
raw_buffer.clear(); //HACK until we have a proper ring buffer to avoid eating up all memory...
//0. initialize if not already initialized
if (raw_buffer.empty()) {
iohandler.readMeteoData(new_buffer_start, new_buffer_end, raw_buffer);
raw_start = new_buffer_start;
raw_end = new_buffer_end;
vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(new_start, new_end, vecMeteo);
raw_buffer.push(new_start, new_end, vecMeteo);
return;
}
//1. Check whether data is in buffer already, and buffer it if not
if ((date_start < raw_start) || (date_end > raw_end)) {
//rebuffer data
if ((new_buffer_end != raw_end) || (new_buffer_start != raw_start)) { //rebuffer for real
raw_buffer.clear(); //the plugins do it internally anyway, but this is cheap and safe...
iohandler.readMeteoData(new_buffer_start, new_buffer_end, raw_buffer);
raw_start = new_buffer_start;
raw_end = new_buffer_end;
}
const size_t buffer_size = raw_buffer.size();
vector< vector<MeteoData> > tmp_raw_buffer;
while (date_end > new_buffer_end){
//if the requested interval is bigger than a normal buffer, we have to increase the buffer anyway...
tmp_raw_buffer.reserve(buffer_size);
iohandler.readMeteoData(new_buffer_end, new_buffer_end+chunk_size, tmp_raw_buffer);
if (tmp_raw_buffer.size() != buffer_size) {
ostringstream ss;
ss << "The number of stations changed over time from " << buffer_size << " to " << tmp_raw_buffer.size() << ", ";
ss << "this is not handled yet!";
throw IOException(ss.str(), AT);
}
const Date buffer_start( raw_buffer.getBufferStart() );
const Date buffer_end( raw_buffer.getBufferEnd() );
if (new_start>buffer_end || new_end<buffer_start) { //easy: full rebuffer
vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(new_start, new_end, vecMeteo);
raw_buffer.push(new_start, new_end, vecMeteo);
return;
}
//Loop through stations and append data
for (size_t ii=0; ii<buffer_size; ii++){ //loop through stations
if ((!raw_buffer[ii].empty()) && (!tmp_raw_buffer[ii].empty())){
//check if the last element equals the first one
if (raw_buffer[ii].back().date >= tmp_raw_buffer[ii].front().date)
raw_buffer[ii].pop_back(); //delete the element with the same date
}
if (new_start<buffer_start) { //some data must be inserted before
vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(new_start, buffer_start, vecMeteo);
raw_buffer.push(new_start, buffer_start, vecMeteo);
}
raw_buffer[ii].reserve(raw_buffer[ii].size()+tmp_raw_buffer[ii].size());
raw_buffer[ii].insert(raw_buffer[ii].end(), tmp_raw_buffer[ii].begin(), tmp_raw_buffer[ii].end());
}
new_buffer_end += chunk_size;
raw_end = new_buffer_end;
}
if (new_end>buffer_end) { //some data must be inserted after. Keep in mind both before and after could happen simultaneously!
vector< METEO_SET > vecMeteo;
iohandler.readMeteoData(buffer_end, new_end, vecMeteo);
raw_buffer.push(buffer_end, new_end, vecMeteo);
}
}
const std::string TimeSeriesManager::toString() const {
......@@ -446,29 +317,10 @@ const std::string TimeSeriesManager::toString() const {
os << "Processing level = " << processing_level << "\n";
os << dataGenerator.toString();
//display raw_buffer
os << "RawBuffer content (" << raw_buffer.size() << " stations)\n";
for(size_t ii=0; ii<raw_buffer.size(); ii++) {
if (!raw_buffer[ii].empty()){
os << std::setw(10) << raw_buffer[ii].front().meta.stationID << " = "
<< raw_buffer[ii].front().date.toString(Date::ISO) << " - "
<< raw_buffer[ii].back().date.toString(Date::ISO) << ", "
<< raw_buffer[ii].size() << " timesteps" << endl;
}
}
//display filtered_cache
os << "Filteredcache content (" << filtered_cache.size() << " stations)\n";
for(size_t ii=0; ii<filtered_cache.size(); ii++) {
if (!filtered_cache[ii].empty()){
os << std::setw(10) << filtered_cache[ii].front().meta.stationID << " = "
<< filtered_cache[ii].front().date.toString(Date::ISO) << " - "
<< filtered_cache[ii].back().date.toString(Date::ISO) << ", "
<< filtered_cache[ii].size() << " timesteps" << endl;
}
}
os << "RawBuffer:" << raw_buffer.toString();
os << "Filteredcache:" << raw_buffer.toString();
//display meteocache
//display point_cache
size_t count=0;
size_t min_stations=std::numeric_limits<size_t>::max();
size_t max_stations=0;
......
......@@ -22,6 +22,7 @@
#include <meteoio/MeteoProcessor.h>
#include <meteoio/dataClasses/MeteoData.h>
#include <meteoio/dataClasses/Coords.h>
#include <meteoio/dataClasses/Buffer.h>
#include <meteoio/IOHandler.h>
#include <meteoio/Config.h>
......@@ -138,9 +139,6 @@ class TimeSeriesManager {
private:
void setDfltBufferProperties();
void fill_filtered_cache();
bool read_filtered_cache(const Date& start_date, const Date& end_date,
std::vector< METEO_SET >& vec_meteo);
void getFromRawBuffer(const Date& date_start, const Date& date_end, std::vector< METEO_SET > &vecMeteo);
void fillRawBuffer(const Date& date_start, const Date& date_end);
const Config& cfg;
......@@ -150,10 +148,9 @@ class TimeSeriesManager {
ProcessingProperties proc_properties; ///< buffer constraints in order to be able to compute the requested values
std::map<Date, METEO_SET > point_cache; ///< stores already resampled data points
std::vector< METEO_SET > filtered_cache; ///< stores already filtered data intervals
std::vector< METEO_SET > raw_buffer; ///< stores raw data
Date fcache_start, fcache_end; ///< store the beginning and the end date of the filtered_cache
Date raw_start, raw_end; ///< store the beginning and the end date of the raw_buffer
MeteoBuffer raw_buffer; ///< stores raw data
MeteoBuffer filtered_cache; ///< stores already filtered data intervals
Duration chunk_size; ///< How much data to read at once
Duration buff_before; ///< How much data to read before the requested date in buffer
unsigned int processing_level;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment