WSL/SLF GitLab Repository

Commit 441bb162 authored by Mathias Bavay's avatar Mathias Bavay
Browse files

The MERGE feature has been much improved: the syntax now follows the (more...

The MERGE feature has been much improved: the syntax now follows the (more logical) EXCLUDE/KEEP syntax: 
*WFJ::MERGE = WFJ2 WFJ1
The documentation and debug toString() have been updated.
parent 3667cd5b
......@@ -174,24 +174,26 @@ namespace mio {
* @endcode
*
* @subsection data_merging Data merging
* It is possible to merge different data sets together, based on a common station name. This is enabled with the key \em MERGE_BY_NAME in the [Input] section.
* If set to \em true, all stations that have the same station name will be merged together, in the order they have been declared/read by the plugin
* (ie the first station that has a value for a given parameter has priority). This is useful, for example, to provide measurements from different
* stations that actually share the same measurement location or to build "composite" station from multiple real stations.
* It is possible to merge different data sets together, with a syntax similar to the Exclude/Keep syntax. This merging occurs <b>after</b> any
* EXCLUDE/KEEP commands. This is useful, for example, to provide measurements from different stations that actually share the
* same measurement location or to build "composite" station from multiple real stations (in this case, using EXCLUDE and/or KEEP
* commands to fine tune how the composite station(s) is/are built).
* Please note that the order of declaration defines the priority (ie the first station that has a value for a given parameter has priority).
*
* @code
* STATION1 = *WFJ
* STATION2 = WFJ2
* STATION3 = WFJ3
* STATION3 = WFJ1
* [...]
*
* *WFJ::KEEP = ILWR PSUM
* WFJ2::EXCLUDE = PSUM ILWR RSWR
* WFJ1::KEEP = ISWR VW DW
*
* MERGE_BY_NAME = true
* *WFJ::MERGE = WFJ2 WFJ1
* DRB2::MERGE = DRB1 WFJ2
* @endcode
* In the above example, if the same station name would have been given to "*WFJ", "WFJ2" and "WFJ1", then a composite station with station ID
* "*WFJ" (ie the first one of the list) would be built with ILWR and PSUM coming from the original *WFJ station, every fields of WFJ2 excepted PSUM, ILWR and ISWR
* and only ISWR, VW, DW from WFJ1 <b>when</b> WFJ2 does not have them.
* In order to avoid circular dependencies, a station can NOT receive data from a station AND contribute data to another station. Otherwise, a
* station can be merged into multiple other stations.
*/
IOInterface* IOHandler::getPlugin(const std::string& plugin_name) const
......@@ -276,17 +278,18 @@ IOInterface* IOHandler::getPlugin(const std::string& cfgkey, const std::string&
//Copy constructor
IOHandler::IOHandler(const IOHandler& aio)
: IOInterface(), cfg(aio.cfg), mapPlugins(aio.mapPlugins), excluded_params(aio.excluded_params), kept_params(aio.kept_params),
: IOInterface(), cfg(aio.cfg), mapPlugins(aio.mapPlugins), excluded_params(aio.excluded_params), kept_params(aio.kept_params),
merge_commands(aio.merge_commands), merged_stations(aio.merged_stations),
copy_parameter(aio.copy_parameter), copy_name(aio.copy_name), enable_copying(aio.enable_copying),
excludes_ready(aio.excludes_ready), keeps_ready(aio.keeps_ready), mergeByName(aio.mergeByName)
excludes_ready(aio.excludes_ready), keeps_ready(aio.keeps_ready), merge_ready(aio.merge_ready)
{}
IOHandler::IOHandler(const Config& cfgreader)
: IOInterface(), cfg(cfgreader), mapPlugins(), excluded_params(), kept_params(), copy_parameter(), copy_name(),
enable_copying(false), excludes_ready(false), keeps_ready(false), mergeByName(false)
: IOInterface(), cfg(cfgreader), mapPlugins(), excluded_params(), kept_params(),
merge_commands(), merged_stations(), copy_parameter(), copy_name(),
enable_copying(false), excludes_ready(false), keeps_ready(false), merge_ready(false)
{
parse_copy_config();
cfg.getValue("MERGE_BY_NAME", "Input", mergeByName, IOUtils::nothrow);
}
IOHandler::~IOHandler() throw()
......@@ -303,12 +306,14 @@ IOHandler& IOHandler::operator=(const IOHandler& source) {
mapPlugins = source.mapPlugins;
excluded_params = source.excluded_params;
kept_params = source.kept_params;
merge_commands = source.merge_commands;
merged_stations = source.merged_stations;
copy_parameter = source.copy_parameter;
copy_name = source.copy_name;
enable_copying = source.enable_copying;
excludes_ready = source.excludes_ready;
keeps_ready = source.keeps_ready;
mergeByName = source.mergeByName;
merge_ready = source.merge_ready;
}
return *this;
}
......@@ -342,7 +347,9 @@ void IOHandler::readStationData(const Date& date, STATIONS_SET& vecStation)
{
IOInterface *plugin = getPlugin("METEO", "Input");
plugin->readStationData(date, vecStation);
if (mergeByName) merge_by_name(vecStation);
if (!merge_ready) create_merge_map();
merge_stations(vecStation);
}
void IOHandler::readMeteoData(const Date& dateStart, const Date& dateEnd,
......@@ -360,7 +367,8 @@ void IOHandler::readMeteoData(const Date& dateStart, const Date& dateEnd,
if (!keeps_ready) create_keep_map();
keep_params(vecMeteo);
if (mergeByName) merge_by_name(vecMeteo);
if (!merge_ready) create_merge_map();
merge_stations(vecMeteo);
copy_parameters(stationindex, vecMeteo);
}
......@@ -416,40 +424,110 @@ void IOHandler::checkTimestamps(const std::vector<METEO_SET>& vecVecMeteo) const
}
}
void IOHandler::create_merge_map()
{
merge_ready = true;
vector<string> merge_keys;
const size_t nrOfStations = cfg.findKeys(merge_keys, "::MERGE", "Input", true);
for (size_t ii=0; ii<nrOfStations; ++ii) {
const size_t found = merge_keys[ii].find_first_of(":");
if (found==std::string::npos) continue;
const string station( merge_keys[ii].substr(0,found) );
std::vector<std::string> vecString;
cfg.getValue(merge_keys[ii], "Input", vecString);
if (vecString.empty()) throw InvalidArgumentException("Empty value for key \""+merge_keys[ii]+"\"", AT);
for (vector<string>::iterator it = vecString.begin(); it != vecString.end(); ++it) {
IOUtils::toUpper(*it);
const std::vector<std::string>::const_iterator vec_it = find (merged_stations.begin(), merged_stations.end(), *it);
if (vec_it==merged_stations.end()) merged_stations.push_back( *it ); //this station will be merged into another one
}
merge_commands[ station ] = vecString;
}
//sort the merged_stations vector so searches will be faster
std::sort(merged_stations.begin(), merged_stations.end());
//make sure there is no "chain merge": station A merging station B and station C merging station A
std::map< std::string, std::vector<std::string> >::iterator it_dest;
for(it_dest=merge_commands.begin(); it_dest!=merge_commands.end(); ++it_dest) {
const string &stationID = it_dest->first;
if (std::binary_search(merged_stations.begin(), merged_stations.end(), stationID))
throw InvalidArgumentException("\'chain merge\' detected for station \'"+stationID+"\', this is not supported (see documentation)", AT);
}
}
//merge stations that have identical names
void IOHandler::merge_by_name(STATIONS_SET& vecStation) const
void IOHandler::merge_stations(STATIONS_SET& vecStation) const
{
const size_t idxMiddle = Optim::ceil( static_cast<double>(vecStation.size()) / 2. );
for (size_t ii=0; (ii<=idxMiddle) && (ii<vecStation.size()); ii++) {
for (size_t jj=ii+1; jj<vecStation.size(); jj++) {
if (vecStation[ii].stationName==vecStation[jj].stationName) {
vecStation[ii].merge( vecStation[jj] );
std::swap( vecStation[jj], vecStation.back() );
vecStation.pop_back();
jj--; //we need to re-compare the current station since it has been swapped
for (size_t ii=0; ii<vecStation.size(); ii++) {
const string toStationID = vecStation[ii].stationID;
//we do not support "chain merge": station A merging station B and station C merging station A
if ( std::find(merged_stations.begin(), merged_stations.end(), toStationID)!=merged_stations.end() ) continue;
const map< string, vector<string> >::const_iterator it = merge_commands.find( toStationID );
if (it == merge_commands.end()) continue; //no merge commands for this station
const vector<string> &merge_from( it->second );
for (size_t idx=0; idx<merge_from.size() ; ++idx) {
const string fromStationID( merge_from[idx] );
for (size_t jj=0; jj<vecStation.size(); jj++) {
if (vecStation[jj].stationID==fromStationID) vecStation[ii].merge( vecStation[jj] );
}
}
}
//remove the stations that have been merged into other ones
for (size_t ii=0; ii<vecStation.size(); ii++) {
const string toStationID = vecStation[ii].stationID;
const vector<string>::const_iterator it = std::find(merged_stations.begin(), merged_stations.end(), toStationID);
if ( it!=merged_stations.end() ) {
std::swap( vecStation[ii], vecStation.back() );
vecStation.pop_back();
ii--; //in case we have multiple identical stations ID
}
}
}
//in this implementation, we consider that the station name does NOT change over time
void IOHandler::merge_by_name(std::vector<METEO_SET>& vecVecMeteo) const
void IOHandler::merge_stations(std::vector<METEO_SET>& vecVecMeteo) const
{
const size_t idxMiddle = Optim::ceil( static_cast<double>(vecVecMeteo.size()) / 2. );
for (size_t ii=0; (ii<=idxMiddle) && (ii<vecVecMeteo.size()); ii++) {
for (size_t ii=0; ii<vecVecMeteo.size(); ii++) {
if (vecVecMeteo[ii].empty()) continue;
for (size_t jj=ii+1; jj<vecVecMeteo.size(); jj++) {
if (vecVecMeteo[jj].empty()) continue;
const string toStationID = vecVecMeteo[ii][0].meta.stationID;
//we do not support "chain merge": station A merging station B and station C merging station A
if ( std::find(merged_stations.begin(), merged_stations.end(), toStationID)!=merged_stations.end() ) continue;
const map< string, vector<string> >::const_iterator it = merge_commands.find( toStationID );
if (it == merge_commands.end()) continue; //no merge commands for this station
const vector<string> &merge_from( it->second );
for (size_t idx=0; idx<merge_from.size(); ++idx) {
const string fromStationID( merge_from[idx] );
if (vecVecMeteo[ii][0].meta.stationName==vecVecMeteo[jj][0].meta.stationName) {
MeteoData::mergeTimeSeries(vecVecMeteo[ii], vecVecMeteo[jj]);
std::swap( vecVecMeteo[jj], vecVecMeteo.back() );
vecVecMeteo.pop_back();
jj--; //we need to re-compare the current station since it has been swapped
for (size_t jj=0; jj<vecVecMeteo.size(); jj++) {
if (vecVecMeteo[jj].empty()) continue;
if (vecVecMeteo[jj][0].meta.stationID==fromStationID)
MeteoData::mergeTimeSeries(vecVecMeteo[ii], vecVecMeteo[jj]);
}
}
}
//remove the stations that have been merged into other ones
for (size_t ii=0; ii<vecVecMeteo.size(); ii++) {
const string toStationID = vecVecMeteo[ii][0].meta.stationID;
const vector<string>::const_iterator it = std::find(merged_stations.begin(), merged_stations.end(), toStationID);
if ( it!=merged_stations.end() ) {
std::swap( vecVecMeteo[ii], vecVecMeteo.back() );
vecVecMeteo.pop_back();
ii--; //in case we have multiple identical stations ID
}
}
}
void IOHandler::create_exclude_map()
......@@ -732,8 +810,17 @@ const std::string IOHandler::toString() const
os << "</excluded_params>\n";
}
if (mergeByName)
os << "Merge stations by stationName\n";
if (!merge_commands.empty()) {
os << "<merge_commands>\n";
std::map< std::string, std::vector<std::string> >::const_iterator it_merge;
for (it_merge=merge_commands.begin(); it_merge != merge_commands.end(); ++it_merge) {
os << setw(10) << it_merge->first << " <- ";
for (size_t ii=0; ii<it_merge->second.size(); ++ii)
os << it_merge->second[ii] << " ";
os << "\n";
}
os << "</merge_commands>\n";
}
os << "</IOHandler>\n";
return os.str();
......
......@@ -69,19 +69,21 @@ class IOHandler : public IOInterface {
void parse_copy_config();
void create_exclude_map();
void create_keep_map();
void create_merge_map();
void checkTimestamps(const std::vector<METEO_SET>& vecVecMeteo) const;
void exclude_params(std::vector<METEO_SET>& vecVecMeteo) const;
void keep_params(std::vector<METEO_SET>& vecVecMeteo) const;
void copy_parameters(const size_t& stationindex, std::vector< METEO_SET >& vecMeteo) const;
void merge_by_name(std::vector<METEO_SET>& vecVecMeteo) const;
void merge_by_name(STATIONS_SET& vecStation) const;
void merge_stations(std::vector<METEO_SET>& vecVecMeteo) const;
void merge_stations(STATIONS_SET& vecStation) const;
const Config& cfg;
std::map<std::string, IOInterface*> mapPlugins;
std::map< std::string, std::set<std::string> > excluded_params;
std::map< std::string, std::set<std::string> > kept_params;
std::vector<std::string> copy_parameter, copy_name;
bool enable_copying, excludes_ready, keeps_ready, mergeByName;
std::map< std::string, std::vector<std::string> > merge_commands;
std::vector<std::string> merged_stations, copy_parameter, copy_name;
bool enable_copying, excludes_ready, keeps_ready, merge_ready;
};
} //namespace
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment