WSL/SLF GitLab Repository

Commit c6a2c507 authored by Thomas Egger's avatar Thomas Egger
Browse files

PSQLIO: Adding initial capability to write data into the database

parent 7af06c60
......@@ -151,7 +151,7 @@ void PSQLIO::getParameters(const Config& cfg)
cfg.getValue("PSQL_PASS", "Input", passwd);
string stations;
cfg.getValue("STATIONS", "Input", stations);
cfg.getValue("STATIONS", "Input", stations, IOUtils::nothrow);
IOUtils::readLineToVec(stations, vecFixedStationID, ',');
string exclude_file;
......@@ -235,28 +235,9 @@ void PSQLIO::readAssimilationData(const Date& /*date_in*/, Grid2DObject& /*da_ou
throw IOException("Nothing implemented here", AT);
}
void PSQLIO::readStationData(const Date&, std::vector<StationData>& vecStation)
void PSQLIO::readMetaData(const std::string& query, std::vector<StationData>& vecStation)
{
if (!vecMeta.empty()) {
vecStation = vecMeta;
return;
}
vecStation.clear();
string station_list;
if (vecFixedStationID.empty() && vecMobileStationID.empty()) {
return; //nothing to do
} else {
for (vector<string>::const_iterator it = vecFixedStationID.begin(); it != vecFixedStationID.end(); ++it) {
if (it != vecFixedStationID.begin()) {
station_list += ", ";
}
station_list += "'" + *it + "'";
}
}
PGresult *result = get_data(sql_meta + " (" + station_list + ") ORDER BY id;");
PGresult *result = sql_exec(query);
if (result) {
int rows = PQntuples(result);
......@@ -271,7 +252,6 @@ void PSQLIO::readStationData(const Date&, std::vector<StationData>& vecStation)
throw IOException("Result set does not have all necessary columns", AT);
}
vector<StationData> tmp_station;
for (int ii=0; ii<rows; ii++) {
int epsg;
double easting, northing, altitude;
......@@ -286,21 +266,44 @@ void PSQLIO::readStationData(const Date&, std::vector<StationData>& vecStation)
point.setXY(easting, northing, altitude);
StationData sd(point, PQgetvalue(result, ii, col_id), PQgetvalue(result, ii, col_name));
tmp_station.push_back(sd); //this is ordered ascending by id
vecStation.push_back(sd); //this is ordered ascending by id
}
//order according to station numbers in io.ini, PGresult is not ordered
PQclear(result);
}
}
void PSQLIO::readStationData(const Date&, std::vector<StationData>& vecStation)
{
if (!vecMeta.empty()) {
vecStation = vecMeta;
return;
}
vecStation.clear();
string station_list;
if (vecFixedStationID.empty() && vecMobileStationID.empty()) {
return; //nothing to do
} else {
for (vector<string>::const_iterator it = vecFixedStationID.begin(); it != vecFixedStationID.end(); ++it) {
if (it != vecFixedStationID.begin()) {
station_list += ", ";
}
station_list += "'" + *it + "'";
for (vector<StationData>::const_iterator station_it = tmp_station.begin(); station_it != tmp_station.end(); ++station_it) {
if ((*station_it).stationID == *it) {
vecStation.push_back(*station_it);
}
}
}
string query = sql_meta + " (" + station_list + ") ORDER BY id;";
vector<StationData> tmp_station;
readMetaData(query, tmp_station);
for (vector<string>::const_iterator it = vecFixedStationID.begin(); it != vecFixedStationID.end(); ++it) {
for (vector<StationData>::const_iterator station_it = tmp_station.begin(); station_it != tmp_station.end(); ++station_it) {
if ((*station_it).stationID == *it) {
vecStation.push_back(*station_it);
}
}
PQclear(result);
}
}
......@@ -353,7 +356,7 @@ void PSQLIO::readData(const Date& dateStart, const Date& dateEnd, std::vector<Me
replace(sql_query, "DATE_START", date_start);
replace(sql_query, "DATE_END", date_end);
PGresult *result = get_data(sql_query);
PGresult *result = sql_exec(sql_query);
if (result) {
int rows = PQntuples(result);
int columns = PQnfields(result);
......@@ -441,39 +444,263 @@ void PSQLIO::map_parameters(PGresult* result, MeteoData& md, std::vector<size_t>
*/
bool PSQLIO::checkConsistency(const std::vector<MeteoData>& vecMeteo, StationData& sd)
{
if (!vecMeteo.empty()) //to get the station data even when in bug 87 conditions
if (!vecMeteo.empty()) // to get the station data even when in bug 87 conditions
sd = vecMeteo[0].meta;
for (size_t ii=1; ii<vecMeteo.size(); ii++){
const Coords& p1 = vecMeteo[ii-1].meta.position;
const Coords& p2 = vecMeteo[ii].meta.position;
if (p1 != p2) {
//we don't mind if p1==nodata or p2==nodata
if(p1.isNodata()==false && p2.isNodata()==false) return false;
if (p1.isNodata()==false && p2.isNodata()==false) return false;
}
}
return true;
}
void PSQLIO::writeMeteoData(const std::vector< std::vector<MeteoData> >& /*vecMeteo*/, const std::string&)
void PSQLIO::checkForUsedParameters(const std::vector<MeteoData>& vecMeteo, std::vector<bool>& vecParamInUse, std::vector<std::string>& vecColumnName)
{
//Nothing so far
throw IOException("Nothing implemented here", AT);
if (vecMeteo.empty()) return;
/**
* This procedure loops through all MeteoData objects present in vecMeteo and finds out which
* meteo parameters are actually in use, i. e. have at least one value that differs from IOUtils::nodata.
* If a parameter is in use, then vecParamInUse[index_of_parameter] is set to true and the column
* name is set in vecColumnName[index_of_parameter]
*/
size_t nr_of_parameters = vecMeteo[0].getNrOfParameters();
vecParamInUse.resize(nr_of_parameters, false);
vecColumnName.resize(nr_of_parameters, "NULL");
for (size_t ii=0; ii<vecMeteo.size(); ii++) {
for (size_t jj=0; jj<nr_of_parameters; jj++) {
if (!vecParamInUse[jj]) {
if (vecMeteo[ii](jj) != IOUtils::nodata) {
vecParamInUse[jj] = true;
vecColumnName.at(jj) = vecMeteo[ii].getNameForParameter(jj);
//cout << "Used Parameter: " << vecColumnName.at(jj) << endl;
}
}
}
}
}
size_t PSQLIO::checkExistence(const std::vector<StationData>& vec_stations, const StationData& sd)
{
for (size_t ii=0; ii<vec_stations.size(); ii++) {
if (sd == vec_stations[ii]) return ii;
}
return IOUtils::npos;
}
void PSQLIO::add_meta_data(const unsigned int& index, const StationData& sd)
{
string stationName = (sd.stationName != "" ? sd.stationName : sd.stationID);
stringstream values;
values << "(" << index << ","
<< "'" << stationName << "'," << fixed
<< setprecision(2) << sd.position.getEasting() << ","
<< sd.position.getNorthing() << ","
<< sd.position.getAltitude() << ","
<< sd.position.getEPSG() << ")";
string query = "INSERT INTO FIXED_STATION (ID_FIXED_STATION,STATION_NAME,COORD_X,COORD_Y,ALTITUDE, EPSG) VALUES " + values.str() + ";";
//cout << "EXEC: " << query << endl;
sql_exec(query);
}
int PSQLIO::get_sensor_index()
{
int sensor_index = 1;
string query = "SELECT max(id_fixed_sensor) from fixed_sensor;";
PGresult *result = sql_exec(query);
if (result) {
int rows = PQntuples(result);
int columns = PQnfields(result);
if (rows != 1 || columns != 1) {
throw IOException("ERROR", AT);
}
string val( PQgetvalue(result, 0, 0) );
IOUtils::convertString(sensor_index, val);
sensor_index++;
PQclear(result);
}
//cout << "Sensor_index: " << sensor_index << endl;
return sensor_index;
}
void PSQLIO::add_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id)
{
string query = "SELECT id_measurement_type as id, meas_name from measurement_type order by id asc;";
int sensor_index = get_sensor_index();
stringstream ss;
ss << index;
string station_id = ss.str();
std::map<size_t, string> map_sensor_type;
PGresult *result = sql_exec(query);
if (result) {
int rows = PQntuples(result);
//int columns = PQnfields(result);
for (int ii=0; ii<rows; ii++) {
string id( PQgetvalue(result, ii, 0) );
string type( PQgetvalue(result, ii, 1) );
IOUtils::toUpper(type);
IOUtils::trim(type);
for (size_t jj=0; jj<vecColumnName.size(); jj++) {
if (type == vecColumnName[jj]) map_sensor_type[jj] = id;
if (type == "IPREC" && vecColumnName[jj] == "HNW") map_sensor_type[jj] = id;
}
}
PQclear(result);
} else {
throw;
}
// Now actually add all sensors that were identified
for (map<size_t, string>::const_iterator it = map_sensor_type.begin(); it != map_sensor_type.end(); ++it) {
ss.str("");
ss << sensor_index;
string sensor_id = ss.str();
string type = it->second;
query = "INSERT INTO FIXED_SENSOR (ID_FIXED_SENSOR,FK_ID_FIXED_STATION,FK_ID_MEASUREMENT_TYPE,MEAS_HEIGHT) VALUES (" + sensor_id + "," + station_id + "," + type + ",0.0);";
//cout << "EXEC: " << query << endl;
sql_exec(query);
map_sensor_id[it->first] = sensor_id;
sensor_index++;
}
}
void PSQLIO::get_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id)
{
}
int PSQLIO::get_measurement_index()
{
int index = 1;
string query = "SELECT MAX(ID_FIXED_MEASUREMENT) from fixed_measurement;";
PGresult *result = sql_exec(query);
if (result) {
int rows = PQntuples(result);
int columns = PQnfields(result);
if (rows != 1 || columns != 1) {
throw IOException("ERROR", AT);
}
string val( PQgetvalue(result, 0, 0) );
IOUtils::convertString(index, val);
index++;
PQclear(result);
}
//cout << "Measurement index: " << index << endl;
return index;
}
void PSQLIO::writeMeteoData(const std::vector< std::vector<MeteoData> >& vecMeteo, const std::string&)
{
// Make sure we have an up to date set of all the stations already available in the DB
vector<StationData> vecAllStations;
readMetaData("select id_fixed_station as id, station_name as name, coord_x as x, coord_y as y, altitude, epsg from fixed_station ORDER BY id;", vecAllStations);
unsigned int index = 1;
if (vecAllStations.size()) {
cout << "Found " << vecAllStations.size() << " stations overall, highest id: " << vecAllStations[vecAllStations.size()-1].stationID << endl;
IOUtils::convertString(index, vecAllStations[vecAllStations.size()-1].stationID);
index++;
}
for (size_t ii=0; ii<vecMeteo.size(); ii++){
if (!vecMeteo[ii].size()) continue; // in case there is no data, we can't write anything to DB
//Loop through all stations
/*for (size_t ii=0; ii<vecMeteo.size(); ii++){
//1. check consistency of station data position -> write location in header or data section
StationData sd;
vector<bool> vecParamInUse;
vector<string> vecColumnName;
map<size_t, string> map_sensor_id;
sd.position.setProj(coordout, coordoutparam);
const bool isConsistent = checkConsistency(vecMeteo.at(ii), sd); // sd will hold valid meta info
const bool isConsistent = checkConsistency(vecMeteo[ii], sd); // sd will hold valid meta info
size_t present_index = checkExistence(vecAllStations, sd);
checkForUsedParameters(vecMeteo[ii], vecParamInUse, vecColumnName);
if (isConsistent) { //static station
if (isConsistent) { //static station
present_index = IOUtils::npos;
if (present_index == IOUtils::npos) { //write into fixed_station
cout << "Inserting data for station '" << sd.stationName << "' with the id_fixed_station " << index << endl;
add_meta_data(index, sd);
add_sensors(index, vecColumnName, map_sensor_id);
index++;
} else { // just get the sensor mappings
get_sensors((int)present_index, vecColumnName, map_sensor_id);
}
} else { //mobile station
throw IOException("Mobile station writing not implemented", AT);
}
int currentid = get_measurement_index();
stringstream ss;
string query = "INSERT INTO FIXED_MEASUREMENT (ID_FIXED_MEASUREMENT,FK_ID_FIXED_SENSOR,MEAS_DATE,MEAS_VALUE) VALUES ";
bool comma = false;
for (size_t jj=0; jj<vecMeteo[ii].size(); jj++) {
MeteoData tmp(vecMeteo[ii][jj]);
convertUnitsBack(tmp);
string timestamp(vecMeteo[ii][jj].date.toString(Date::ISO));
std::replace( timestamp.begin(), timestamp.end(), 'T', ' ');
for (map<size_t, string>::const_iterator it = map_sensor_id.begin(); it != map_sensor_id.end(); ++it) {
ss.str("");
ss << currentid;
string id(ss.str());
ss.str("");
ss << tmp(it->first);
string value(ss.str());
string values = "(" + id + "," + it->second + ", TIMESTAMP '" + timestamp + "'," + value + ")";
if (!comma) {
comma = true;
} else {
query += ",";
}
query += values;
currentid++;
}
}
}*/
query += ";";
//cout << "EXEC: " << query << endl;
sql_exec(query);
}
}
void PSQLIO::readPOI(std::vector<Coords>&)
......@@ -494,6 +721,34 @@ void PSQLIO::write2DGrid(const Grid2DObject& /*grid_in*/, const MeteoGrids::Para
throw IOException("Nothing implemented here", AT);
}
void PSQLIO::convertUnitsBack(MeteoData& meteo)
{
//converts Kelvin to °C, converts RH to [0,100]
double& ta = meteo(MeteoData::TA);
if (ta != IOUtils::nodata)
ta = K_TO_C(ta);
double& tsg = meteo(MeteoData::TSG);
if (tsg != IOUtils::nodata)
tsg = K_TO_C(tsg);
double& tss = meteo(MeteoData::TSS);
if (tss != IOUtils::nodata)
tss = K_TO_C(tss);
double& rh = meteo(MeteoData::RH);
if (rh != IOUtils::nodata)
rh *= 100.;
double& hs = meteo(MeteoData::HS);
if (hs != IOUtils::nodata)
hs *= 100.; //is in cm
double& p = meteo(MeteoData::P);
if (p != IOUtils::nodata)
p /= 100.; //is in mbar
}
void PSQLIO::convertUnits(MeteoData& meteo)
{
//converts °C to Kelvin, converts RH to [0,1]
......@@ -542,7 +797,7 @@ void PSQLIO::open_connection()
}
}
PGresult *PSQLIO::get_data(const string& sql_command)
PGresult *PSQLIO::sql_exec(const string& sql_command)
{
open_connection();
......@@ -556,8 +811,11 @@ PGresult *PSQLIO::get_data(const string& sql_command)
// options.align = 1; /* Pad short columns for alignment */
// options.fieldSep = "|"; /* Use a pipe as the field separator */
// PQprint(stdout, result, &options);
} else if (status == PGRES_COMMAND_OK) {
// other command like insert executed
//cout << "Successful completion of a command returning no data." << endl;
} else {
cout << "ERROR while executing the following sql statement: " << sql_command << endl;
//cout << "BAD SELECT: " << PQresStatus(status) << endl;
PQclear(result);
return NULL;
......
......@@ -66,15 +66,24 @@ class PSQLIO : public IOInterface {
void getParameters(const Config& cfg);
void create_shadow_map(const std::string& exclude_file);
void open_connection();
PGresult* get_data(const std::string& sqlcommand);
PGresult* sql_exec(const std::string& sqlcommand);
static bool replace(std::string& str, const std::string& from, const std::string& to);
void readData(const Date& dateStart, const Date& dateEnd, std::vector<MeteoData>& vecMeteo, const size_t& stationindex);
void readMetaData(const std::string& query, std::vector<StationData>& vecStation);
void add_meta_data(const unsigned int& index, const StationData& sd);
void map_parameters(PGresult* result, MeteoData& md, std::vector<size_t>& index);
static void parse_row(PGresult* result, const int& row, const int& cols,
MeteoData& md, std::vector<size_t>& index, std::vector<mio::MeteoData>& vecMeteo);
void close_connection(PGconn *conn);
static bool checkConsistency(const std::vector<MeteoData>& vecMeteo, StationData& sd);
static size_t checkExistence(const std::vector<StationData>& vec_stations, const StationData& sd);
static void convertUnits(MeteoData& meteo);
static void convertUnitsBack(MeteoData& meteo);
static void checkForUsedParameters(const std::vector<MeteoData>& vecMeteo, std::vector<bool>& vecParamInUse, std::vector<std::string>& vecColumnName);
void add_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id);
int get_sensor_index();
int get_measurement_index();
void get_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id);
std::string coordin, coordinparam, coordout, coordoutparam; //projection parameters
std::string endpoint, port, dbname, userid, passwd; ///< Variables for endpoint configuration
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment