WSL/SLF GitLab Repository

PSQLIO.cc 27.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/***********************************************************************************/
/*  Copyright 2009 WSL Institute for Snow and Avalanche Research    SLF-DAVOS      */
/***********************************************************************************/
/* This file is part of MeteoIO.
    MeteoIO is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    MeteoIO is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with MeteoIO.  If not, see <http://www.gnu.org/licenses/>.
*/
18
19
#include <set>
#include <algorithm>
20
21
22
23
24
25
#include "PSQLIO.h"

using namespace std;

namespace mio {
/**
26
 * @page psqlio PSQLIO
27
 * @section psql_format Format
28
29
30
 * This plugin connects to a <i>generic</i> <A HREF="www.postgresql.org/">PostgreSQL</A> server to retrieve its meteorological data. The server
 * parameters must be provided as well as the queries to retrieve the stations' data and metadata. In order to compile this plugin,
 * the development package of libpq is required (this is the PostgreSQL c client library).
31
 *
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 * @subsection psql_meta_query Metadata query
 * This query is used to retrieve the stations' metadata. This SQL query string should retrieve the following columns as result set (in this very order):
 *
 *      id (int), name (string), x (easting as double), y (northing as double), altitude (height above sea level as double), epsg (int)
 *
 * The user is allowed to select stations with the STATIONS keyword (see below). That is why the SQL query has to end with a 'WHERE id_column_name IN' clause, for example:
 *
 *      SELECT id, station_name AS name, x_coord AS x, y_coord AS y, z AS altitude, epsg from all_stations WHERE id IN
 *
 * @subsection psql_data_query Data query
 * This query is used to retrieve the data for the user selected stations within a given time interval.
 * The SQL query may retrieve the following columns as result set (any order, only date is mandatory):
 *
 *      date (mandatory, as date), ta (double), rh (double), p (double), vw (double), dw (double), iprec (the HNW value, double), iswr (double)
 *
 * The SQL query must retrieve the data for one station only, which has to be specified as \a STATIONID (this will be dynamically replaced by the plugin).
 * To set the upper and lower bounds for the date the SQL query has to contain \a DATE_START and \a DATE_END. These keywords will be dynamically replaced by
 * the plugin with the correct date. Furthermore the resultset should be ordered by date ascending. An example for a correct SQL data query string is therefore:
 *
 *      SELECT * FROM all_measurements WHERE id = ''STATIONID'' AND date>=''DATE_START'' AND date<=''DATE_END'' ORDER BY date
 *
 * @subsection psql_exclude_file Exclude file
 * It is possible to exclude specific parameters from specific stations. This is done by listing in a CSV file for each station id, which parameters should be excluded.
 * An example of an exclude file is given below:
 * @code
 *       # Example of an exclude file, comment line
 *       ; another comment line
 *       ; the parameters to exclude are specified as comma separated values:
 *       # stationid,parameter1,parameter2
 *       1,p,RH,Iprec
 *       230,RH
 *       231,RH
 * @endcode
 *
66
 * @section psql_units Units
67
68
69
 * Units are assumed to be pure SI, except:
 *  - temperatures in °C
 *  - relative humidity in %
70
71
 *  - snow height in cm
 *  - pressure in mbar
72
73
74
 *
 * @section psql_keywords Keywords
 * This plugin uses the following keywords:
75
76
77
78
79
80
81
82
83
 * - COORDSYS: coordinate system (see Coords); [Input] section
 * - COORDPARAM: extra coordinates parameters (see Coords); [Input] section
 * - database connection keywords; [Input] section:
 *      - PSQL_URL: The URL or IP of the database server
 *      - PSQL_PORT: the port to use to connect
 *      - PSQL_DB: The name of the database to access
 *      - PSQL_USER: The username to access the server
 *      - PSQL_PASS: The password to authenticate the PSQL_USER
 * - database structure keywords; [Input] section
84
85
 *      - SQL_META: SQL query to use to get the stations' metadata.
 *      - SQL_DATA: SQL query to use to get the stations' data.
86
 * - STATIONS: comma separated list of station ids that the user is interested in; [Input] section
87
88
 * - EXCLUDE: File containing a list of parameters to exclude listed per station id (optional; [Input] section)
 *
89
90
91
92
 */

const double PSQLIO::plugin_nodata = -999.; //plugin specific nodata value. It can also be read by the plugin (depending on what is appropriate)

93
94
PSQLIO::PSQLIO(const std::string& configfile) : coordin(), coordinparam(), coordout(), coordoutparam(), endpoint(), port(),
                                                dbname(), userid(), passwd(), psql(NULL), default_timezone(1.), vecMeta(),
95
                                                vecFixedStationID(), vecMobileStationID(), sql_meta(), sql_data(), shadowed_parameters()
96
{
97
	Config cfg(configfile);
98
	IOUtils::getProjectionParameters(cfg, coordin, coordinparam, coordout, coordoutparam);
99
	getParameters(cfg);
100
101
}

102
103
PSQLIO::PSQLIO(const Config& cfg) : coordin(), coordinparam(), coordout(), coordoutparam(), endpoint(), port(),
                                          dbname(), userid(), passwd(), psql(NULL), default_timezone(1.), vecMeta(),
104
                                          vecFixedStationID(), vecMobileStationID(), sql_meta(), sql_data(), shadowed_parameters()
105
106
{
	IOUtils::getProjectionParameters(cfg, coordin, coordinparam, coordout, coordoutparam);
107
	getParameters(cfg);
108
109
}

110
111
112
113
114
PSQLIO::PSQLIO(const PSQLIO& in) : coordin(in.coordin), coordinparam(in.coordinparam), coordout(in.coordout),
                                   coordoutparam(in.coordoutparam), endpoint(in.endpoint), port(in.port), dbname(in.dbname), userid(in.userid),
                                   passwd(in.passwd), psql(NULL), default_timezone(1.), vecMeta(in.vecMeta),
                                   vecFixedStationID(in.vecFixedStationID), vecMobileStationID(in.vecMobileStationID),
                                   sql_meta(in.sql_meta), sql_data(in.sql_data), shadowed_parameters(in.shadowed_parameters) {}
115
116
117

PSQLIO& PSQLIO::operator=(const PSQLIO& in)
{
118
	PSQLIO tmp(in);
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

	 swap(coordin, tmp.coordin);
	 swap(coordinparam, tmp.coordinparam);
	 swap(coordout, tmp.coordout);
	 swap(coordoutparam, tmp.coordoutparam);
	 swap(endpoint, tmp.endpoint);
	 swap(port, tmp.port);
	 swap(dbname, tmp.dbname);
	 swap(userid, tmp.userid);
	 swap(passwd, tmp.passwd);
	 swap(psql, tmp.psql);
	 swap(default_timezone, tmp.default_timezone);
	 swap(vecMeta, tmp.vecMeta);
	 swap(vecFixedStationID, tmp.vecFixedStationID);
	 swap(vecMobileStationID, tmp.vecMobileStationID);
	 swap(sql_meta, tmp.sql_meta);
	 swap(sql_data, tmp.sql_data);
	 swap(shadowed_parameters, tmp.shadowed_parameters);

      return *this;
}

141
PSQLIO::~PSQLIO() throw() {}
142

143
void PSQLIO::getParameters(const Config& cfg)
144
145
146
147
148
149
150
151
152
{
	port = "5432"; //The default PostgreSQL port

	cfg.getValue("PSQL_URL", "Input", endpoint);
	cfg.getValue("PSQL_PORT", "Input", port, IOUtils::nothrow);
	cfg.getValue("PSQL_DB", "Input", dbname);
	cfg.getValue("PSQL_USER", "Input", userid);
	cfg.getValue("PSQL_PASS", "Input", passwd);

153
	string stations;
154
	cfg.getValue("STATIONS", "Input", stations, IOUtils::nothrow);
155
	IOUtils::readLineToVec(stations, vecFixedStationID, ',');
156

157
	string exclude_file;
158
	cfg.getValue("EXCLUDE", "Input", exclude_file, IOUtils::nothrow);
159
	if (!exclude_file.empty() && IOUtils::fileExists(exclude_file)) {
160
161
162
163
164
		//if this is a relative path, prefix the path with the current path
		const std::string prefix = ( IOUtils::isAbsolutePath(exclude_file) )? "" : cfg.getConfigRootDir()+"/";
		const std::string path = IOUtils::getPath(prefix+exclude_file, true);  //clean & resolve path
		const std::string filename = path + "/" + IOUtils::getFilename(exclude_file);

165
166
167
		create_shadow_map(exclude_file);
	}

168
	cfg.getValue("SQL_META", "Input", sql_meta);
169
170
171
	cfg.getValue("SQL_DATA", "Input", sql_data);

	cfg.getValue("TIME_ZONE", "Input", default_timezone, IOUtils::nothrow);
172
173
}

174
175
176
177
178
179
180
void PSQLIO::create_shadow_map(const std::string& exclude_file)
{
	std::ifstream fin; //Input file streams
	fin.open(exclude_file.c_str(), std::ifstream::in);
	if (fin.fail()) throw FileAccessException(exclude_file, AT);

	try {
181
		const char eoln = IOUtils::getEoln(fin); //get the end of line character for the file
182

183
		vector<string> tmpvec;
184
		string line;
185

186
187
188
189
190
191
		while (!fin.eof()) { //Go through file
			getline(fin, line, eoln); //read complete line meta information
			IOUtils::stripComments(line);
			const size_t ncols = IOUtils::readLineToVec(line, tmpvec, ',');

			if (ncols > 1) {
192
193
194
195
				for(vector<string>::iterator it = tmpvec.begin()+1; it != tmpvec.end(); ++it) {
					IOUtils::toUpper(*it);
				}

196
				set<string> tmpset(tmpvec.begin()+1, tmpvec.end());
197
				shadowed_parameters[ tmpvec[0] ] = tmpset;
198
199
200
201
202
203
			}
		}
	} catch (const std::exception&) {
		fin.close();
		throw;
	}
204

205
206
207
	fin.close();
}

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
void PSQLIO::read2DGrid(Grid2DObject& /*grid_out*/, const std::string& /*name_in*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::read2DGrid(Grid2DObject& /*grid_out*/, const MeteoGrids::Parameters& /*parameter*/, const Date& /*date*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::readDEM(DEMObject& /*dem_out*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::readLanduse(Grid2DObject& /*landuse_out*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::readAssimilationData(const Date& /*date_in*/, Grid2DObject& /*da_out*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

238
void PSQLIO::readMetaData(const std::string& query, std::vector<StationData>& vecStation)
239
{
240
	PGresult *result = sql_exec(query);
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
	if (result) {
		int rows = PQntuples(result);

		int col_id = PQfnumber(result, "id");
		int col_name = PQfnumber(result, "name");
		int col_x = PQfnumber(result, "x");
		int col_y = PQfnumber(result, "y");
		int col_alt = PQfnumber(result, "altitude");
		int col_epsg = PQfnumber(result, "epsg");

		if ((col_id * col_name * col_x * col_y * col_alt * col_epsg) < 0) { //missing column
			throw IOException("Result set does not have all necessary columns", AT);
		}

		for (int ii=0; ii<rows; ii++) {
			int epsg;
			double easting, northing, altitude;

			IOUtils::convertString(epsg, PQgetvalue(result, ii, col_epsg));
			IOUtils::convertString(easting, PQgetvalue(result, ii, col_x));
			IOUtils::convertString(northing, PQgetvalue(result, ii, col_y));
			IOUtils::convertString(altitude, PQgetvalue(result, ii, col_alt));

			Coords point;
			point.setEPSG(epsg);
			point.setXY(easting, northing, altitude);

			StationData sd(point, PQgetvalue(result, ii, col_id), PQgetvalue(result, ii, col_name));
269
			vecStation.push_back(sd); //this is ordered ascending by id
270
271
		}

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
		PQclear(result);
	}
}

void PSQLIO::readStationData(const Date&, std::vector<StationData>& vecStation)
{
	if (!vecMeta.empty()) {
		vecStation = vecMeta;
		return;
	}

	vecStation.clear();
	string station_list;

	if (vecFixedStationID.empty() && vecMobileStationID.empty()) {
		return; //nothing to do
	} else {
289
		for (vector<string>::const_iterator it = vecFixedStationID.begin(); it != vecFixedStationID.end(); ++it) {
290
291
292
			if (it != vecFixedStationID.begin()) {
				station_list += ", ";
			}
293
			station_list += "'" + *it + "'";
294
295
296
297
298
299
300
301
302
303
304
		}
	}
	
	string query = sql_meta + " (" + station_list + ") ORDER BY id;";
	vector<StationData> tmp_station;
	readMetaData(query, tmp_station);

	for (vector<string>::const_iterator it = vecFixedStationID.begin(); it != vecFixedStationID.end(); ++it) {
		for (vector<StationData>::const_iterator station_it = tmp_station.begin(); station_it != tmp_station.end(); ++station_it) {
			if ((*station_it).stationID == *it) {
				vecStation.push_back(*station_it);
305
			}
306
307
		}
	}
308
309
}

310
311
void PSQLIO::readMeteoData(const Date& dateStart, const Date& dateEnd,
                           std::vector< std::vector<MeteoData> >& vecMeteo, const size_t& stationindex)
312
{
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
	if (vecMeta.empty()) readStationData(dateStart, vecMeta);
	if (vecMeta.empty()) return; //if there are no stations -> return

	size_t indexStart=0, indexEnd=vecMeta.size();

	//The following part decides whether all the stations are rebuffered or just one station
	if (stationindex == IOUtils::npos){
		vecMeteo.clear();
		vecMeteo.insert(vecMeteo.begin(), vecMeta.size(), vector<MeteoData>());
	} else {
		if (stationindex < vecMeteo.size()){
			indexStart = stationindex;
			indexEnd   = stationindex+1;
		} else {
			throw IndexOutOfBoundsException("You tried to access a stationindex in readMeteoData that is out of bounds", AT);
		}
	}

	for (size_t ii=indexStart; ii<indexEnd; ii++){ //loop through stations
		readData(dateStart, dateEnd, vecMeteo[ii], ii);
	}
}

bool PSQLIO::replace(std::string& str, const std::string& from, const std::string& to)
{
338
    const size_t start_pos = str.find(from);
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
    if(start_pos == std::string::npos)
        return false;
    str.replace(start_pos, from.length(), to);
    return true;
}

void PSQLIO::readData(const Date& dateStart, const Date& dateEnd, std::vector<MeteoData>& vecMeteo, const size_t& stationindex)
{
	string sql_query(sql_data);

	string id = vecFixedStationID.at(stationindex);
	string date_start = dateStart.toString(Date::ISO);
	string date_end = dateEnd.toString(Date::ISO);
	std::replace(date_start.begin(), date_start.end(), 'T', ' ');
	std::replace(date_end.begin(), date_end.end(), 'T', ' ');

	replace(sql_query, "STATIONID", vecMeta.at(stationindex).stationID);
	replace(sql_query, "DATE_START", date_start);
	replace(sql_query, "DATE_END", date_end);

359
	PGresult *result = sql_exec(sql_query);
360
361
362
	if (result) {
		int rows = PQntuples(result);
		int columns = PQnfields(result);
363

364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
		vector<size_t> index;
		MeteoData tmpmeteo;
		tmpmeteo.meta = vecMeta.at(stationindex);

		map_parameters(result, tmpmeteo, index);

		for (int ii=0; ii<rows; ii++) {
			parse_row(result, ii, columns, tmpmeteo, index, vecMeteo);
		}

		PQclear(result);
	}
}

void PSQLIO::parse_row(PGresult* result, const int& row, const int& cols, MeteoData& md, std::vector<size_t>& index, std::vector<mio::MeteoData>& vecMeteo)
{
	MeteoData tmp(md);
	IOUtils::convertString(md.date, PQgetvalue(result, row, 0), 0.0);

	for (int ii=1; ii<cols; ii++) {
384
		if (index[ii] != IOUtils::npos) {
385
			string val( PQgetvalue(result, row, ii) );
386
387
			if (!val.empty()) IOUtils::convertString(tmp(index[ii]), val);
		}
388
389
	}

390
	convertUnits(tmp);
391
392
393
394
395
	vecMeteo.push_back(tmp);
}

void PSQLIO::map_parameters(PGresult* result, MeteoData& md, std::vector<size_t>& index)
{
396
	const int columns = PQnfields(result);
397

398
399
400
401
	set<string> shadowed;
	map< string, set<string> >::iterator it = shadowed_parameters.find(md.meta.stationID);
	if (it != shadowed_parameters.end()) shadowed = it->second;

402
	for (int ii=0; ii<columns; ii++) {
403
		const string field_name( IOUtils::strToUpper(PQfname(result, ii)) );
404
405
406
407
408
409
		const bool is_in = shadowed.find(field_name) != shadowed.end();
		if (is_in) { // Certain parameters may be shadowed
			index.push_back(IOUtils::npos);
			continue;
		}

410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
		if (field_name == "RH") {
			index.push_back(MeteoData::RH);
		} else if (field_name == "TA") {
			index.push_back(MeteoData::TA);
		} else if (field_name == "DW") {
			index.push_back(MeteoData::DW);
		} else if (field_name == "VW") {
			index.push_back(MeteoData::VW);
		} else if (field_name == "ISWR") {
			index.push_back(MeteoData::ISWR);
		} else if (field_name == "RSWR") {
			index.push_back(MeteoData::RSWR);
		} else if (field_name == "HS") {
			index.push_back(MeteoData::HS);
		} else if (field_name == "IPREC") {
			index.push_back(MeteoData::HNW);
		} else if (field_name == "TSS") {
			index.push_back(MeteoData::TSS);
		} else if (field_name == "TSG") {
			index.push_back(MeteoData::TSG);
		} else if (field_name == "P") {
			index.push_back(MeteoData::P);
		} else { //this is an extra parameter
			md.addParameter(field_name);
			const size_t parindex = md.getParameterIndex(field_name);
			index.push_back(parindex);
		}
	}
438
439
}

440
441
442
443
444
/**
* This function checks whether all the MeteoData elements in vecMeteo are consistent
* regarding their meta data (position information, station name). If they are consistent
* true is returned, otherwise false
*/
Thomas Egger's avatar
Thomas Egger committed
445
bool PSQLIO::checkConsistency(const std::vector<MeteoData>& vecMeteo, StationData& sd)
446
{
447
	if (!vecMeteo.empty()) // to get the station data even when in bug 87 conditions
Thomas Egger's avatar
Thomas Egger committed
448
449
450
451
452
		sd = vecMeteo[0].meta;

	for (size_t ii=1; ii<vecMeteo.size(); ii++){
		const Coords& p1 = vecMeteo[ii-1].meta.position;
		const Coords& p2 = vecMeteo[ii].meta.position;
453

Thomas Egger's avatar
Thomas Egger committed
454
455
		if (p1 != p2) {
			//we don't mind if p1==nodata or p2==nodata
456
			if (p1.isNodata()==false && p2.isNodata()==false) return false;
Thomas Egger's avatar
Thomas Egger committed
457
458
459
460
461
462
		}
	}

	return true;
}

463
void PSQLIO::checkForUsedParameters(const std::vector<MeteoData>& vecMeteo, std::vector<bool>& vecParamInUse, std::vector<std::string>& vecColumnName)
Thomas Egger's avatar
Thomas Egger committed
464
{
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
	if (vecMeteo.empty()) return;

	/**
	 * This procedure loops through all MeteoData objects present in vecMeteo and finds out which
	 * meteo parameters are actually in use, i. e. have at least one value that differs from IOUtils::nodata.
	 * If a parameter is in use, then vecParamInUse[index_of_parameter] is set to true and the column
	 * name is set in vecColumnName[index_of_parameter]
	 */
	size_t nr_of_parameters = vecMeteo[0].getNrOfParameters();
	vecParamInUse.resize(nr_of_parameters, false);
	vecColumnName.resize(nr_of_parameters, "NULL");

	for (size_t ii=0; ii<vecMeteo.size(); ii++) {
		for (size_t jj=0; jj<nr_of_parameters; jj++) {
			if (!vecParamInUse[jj]) {
				if (vecMeteo[ii](jj) != IOUtils::nodata) {
					vecParamInUse[jj] = true;
					vecColumnName.at(jj) = vecMeteo[ii].getNameForParameter(jj);
					//cout << "Used Parameter: " << vecColumnName.at(jj) << endl;
				}
			}
		}
	}
}

size_t PSQLIO::checkExistence(const std::vector<StationData>& vec_stations, const StationData& sd)
{
	for (size_t ii=0; ii<vec_stations.size(); ii++) {
		if (sd == vec_stations[ii]) return ii;
	}

	return IOUtils::npos;
}

void PSQLIO::add_meta_data(const unsigned int& index, const StationData& sd)
{
	string stationName = (sd.stationName != "" ? sd.stationName : sd.stationID);

	stringstream values;
	values << "(" << index << "," 
		  << "'" << stationName << "'," << fixed
		  << setprecision(2) << sd.position.getEasting() << ","
		  << sd.position.getNorthing() << "," 
		  << sd.position.getAltitude() << "," 
		  << sd.position.getEPSG() << ")"; 

	string query = "INSERT INTO FIXED_STATION (ID_FIXED_STATION,STATION_NAME,COORD_X,COORD_Y,ALTITUDE, EPSG) VALUES " + values.str() + ";";
	//cout << "EXEC: " << query << endl;
	sql_exec(query);
}

int PSQLIO::get_sensor_index()
{
	int sensor_index = 1;
	string query = "SELECT max(id_fixed_sensor) from fixed_sensor;";

	PGresult *result = sql_exec(query);	
	if (result) {
		int rows = PQntuples(result);
		int columns = PQnfields(result);

		if (rows != 1 || columns != 1) {
			throw IOException("ERROR", AT);
		}

		string val( PQgetvalue(result, 0, 0) );

		IOUtils::convertString(sensor_index, val);
		sensor_index++;

		PQclear(result);
	}

	//cout << "Sensor_index: " << sensor_index << endl;
	return sensor_index;
}

void PSQLIO::add_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id)
{
	string query = "SELECT id_measurement_type as id, meas_name from measurement_type order by id asc;";
	int sensor_index = get_sensor_index();

	stringstream ss;
	ss << index;
	string station_id = ss.str();

	std::map<size_t, string> map_sensor_type;

	PGresult *result = sql_exec(query);	
	if (result) {
		int rows = PQntuples(result);
		//int columns = PQnfields(result);

		for (int ii=0; ii<rows; ii++) {
			string id( PQgetvalue(result, ii, 0) );
			string type( PQgetvalue(result, ii, 1) );

			IOUtils::toUpper(type);
			IOUtils::trim(type);

			for (size_t jj=0; jj<vecColumnName.size(); jj++) {
				if (type == vecColumnName[jj]) map_sensor_type[jj] = id;
				if (type == "IPREC" && vecColumnName[jj] == "HNW") map_sensor_type[jj] = id;
			}
		}

		PQclear(result);
	} else {
		throw;
	}
	
	// Now actually add all sensors that were identified
	for (map<size_t, string>::const_iterator it = map_sensor_type.begin(); it != map_sensor_type.end(); ++it) {
		ss.str("");
		ss << sensor_index;
		string sensor_id = ss.str();
		string type = it->second;

		query = "INSERT INTO FIXED_SENSOR (ID_FIXED_SENSOR,FK_ID_FIXED_STATION,FK_ID_MEASUREMENT_TYPE,MEAS_HEIGHT) VALUES (" + sensor_id + "," + station_id + "," + type  + ",0.0);";
		//cout << "EXEC: " << query << endl;
		sql_exec(query);

		map_sensor_id[it->first] = sensor_id;

		sensor_index++;
	}
}

void PSQLIO::get_sensors(const unsigned int& index, const std::vector<std::string>& vecColumnName, std::map<size_t, std::string>& map_sensor_id)
{

}

int PSQLIO::get_measurement_index()
{
	int index = 1;
	string query = "SELECT MAX(ID_FIXED_MEASUREMENT) from fixed_measurement;";

	PGresult *result = sql_exec(query);	
	if (result) {
		int rows = PQntuples(result);
		int columns = PQnfields(result);

		if (rows != 1 || columns != 1) {
			throw IOException("ERROR", AT);
		}

		string val( PQgetvalue(result, 0, 0) );

		IOUtils::convertString(index, val);
		index++;

		PQclear(result);
	}

	//cout << "Measurement index: " << index << endl;
	return index;
}

void PSQLIO::writeMeteoData(const std::vector< std::vector<MeteoData> >& vecMeteo, const std::string&)
{
	// Make sure we have an up to date set of all the stations already available in the DB
	vector<StationData> vecAllStations;
	readMetaData("select id_fixed_station as id, station_name as name, coord_x as x, coord_y as y, altitude, epsg from fixed_station ORDER BY id;", vecAllStations);

	unsigned int index = 1;

	if (vecAllStations.size()) {
		cout << "Found " << vecAllStations.size() << " stations overall, highest id: " << vecAllStations[vecAllStations.size()-1].stationID << endl;
		IOUtils::convertString(index, vecAllStations[vecAllStations.size()-1].stationID);
		index++;
	}

	for (size_t ii=0; ii<vecMeteo.size(); ii++){
		if (!vecMeteo[ii].size()) continue; // in case there is no data, we can't write anything to DB
640

Thomas Egger's avatar
Thomas Egger committed
641
642
		//1. check consistency of station data position -> write location in header or data section
		StationData sd;
643
644
645
646
		vector<bool> vecParamInUse;
		vector<string> vecColumnName;
		map<size_t, string> map_sensor_id;

Thomas Egger's avatar
Thomas Egger committed
647
		sd.position.setProj(coordout, coordoutparam);
648
649
650
		const bool isConsistent = checkConsistency(vecMeteo[ii], sd); // sd will hold valid meta info
		size_t present_index = checkExistence(vecAllStations, sd);
		checkForUsedParameters(vecMeteo[ii], vecParamInUse, vecColumnName);
Thomas Egger's avatar
Thomas Egger committed
651

652

653
654
655
656
657
658
659
660
661
662
		if (isConsistent) { //static station
			present_index = IOUtils::npos;
			if (present_index == IOUtils::npos) { //write into fixed_station
				cout << "Inserting data for station '" << sd.stationName << "' with the id_fixed_station " << index << endl; 
				add_meta_data(index, sd);
				add_sensors(index, vecColumnName, map_sensor_id);
				index++;
			} else { // just get the sensor mappings
				get_sensors((int)present_index, vecColumnName, map_sensor_id);				
			}
Thomas Egger's avatar
Thomas Egger committed
663
		} else { //mobile station
664
665
666
667
668
669
670
671
672
673
674
675
676
677
			throw IOException("Mobile station writing not implemented", AT);
		}

		int currentid = get_measurement_index();
		stringstream ss;
		string query = "INSERT INTO FIXED_MEASUREMENT (ID_FIXED_MEASUREMENT,FK_ID_FIXED_SENSOR,MEAS_DATE,MEAS_VALUE) VALUES ";
		bool comma = false;

		for (size_t jj=0; jj<vecMeteo[ii].size(); jj++) {
			MeteoData tmp(vecMeteo[ii][jj]);
			convertUnitsBack(tmp);

			string timestamp(vecMeteo[ii][jj].date.toString(Date::ISO));
			std::replace( timestamp.begin(), timestamp.end(), 'T', ' ');
Thomas Egger's avatar
Thomas Egger committed
678

679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
			for (map<size_t, string>::const_iterator it = map_sensor_id.begin(); it != map_sensor_id.end(); ++it) {
				ss.str("");
				ss << currentid;
				string id(ss.str());

				ss.str("");
				ss << tmp(it->first);
				string value(ss.str()); 
				string values = "(" + id + "," + it->second + ", TIMESTAMP '" + timestamp + "'," + value + ")";

				if (!comma) {
					comma = true;
				} else {
					query += ",";
				}
				query += values;
				
				currentid++;
			}
Thomas Egger's avatar
Thomas Egger committed
698
		}
699
700
701
702
703
		
		query += ";";
		//cout << "EXEC: " << query << endl;
		sql_exec(query);
	}
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
}

void PSQLIO::readPOI(std::vector<Coords>&)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::write2DGrid(const Grid2DObject& /*grid_in*/, const std::string& /*name*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

void PSQLIO::write2DGrid(const Grid2DObject& /*grid_in*/, const MeteoGrids::Parameters& /*parameter*/, const Date& /*date*/)
{
	//Nothing so far
	throw IOException("Nothing implemented here", AT);
}

724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
void PSQLIO::convertUnitsBack(MeteoData& meteo)
{
	//converts Kelvin to °C, converts RH to [0,100]
	double& ta = meteo(MeteoData::TA);
	if (ta != IOUtils::nodata)
		ta = K_TO_C(ta);

	double& tsg = meteo(MeteoData::TSG);
	if (tsg != IOUtils::nodata)
		tsg = K_TO_C(tsg);

	double& tss = meteo(MeteoData::TSS);
	if (tss != IOUtils::nodata)
		tss = K_TO_C(tss);

	double& rh = meteo(MeteoData::RH);
	if (rh != IOUtils::nodata)
		rh *= 100.;

	double& hs = meteo(MeteoData::HS); 
	if (hs != IOUtils::nodata)
		hs *= 100.; //is in cm

	double& p = meteo(MeteoData::P);
	if (p != IOUtils::nodata)
		p /= 100.; //is in mbar
}

752
void PSQLIO::convertUnits(MeteoData& meteo)
753
{
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
	//converts °C to Kelvin, converts RH to [0,1]
	double& ta = meteo(MeteoData::TA);
	if (ta != IOUtils::nodata)
		ta = C_TO_K(ta);

	double& tsg = meteo(MeteoData::TSG);
	if (tsg != IOUtils::nodata)
		tsg = C_TO_K(tsg);

	double& tss = meteo(MeteoData::TSS);
	if (tss != IOUtils::nodata)
		tss = C_TO_K(tss);

	double& rh = meteo(MeteoData::RH);
	if (rh != IOUtils::nodata)
		rh /= 100.;

	double& hs = meteo(MeteoData::HS); //is in cm
	if (hs != IOUtils::nodata)
		hs /= 100.;

	double& p = meteo(MeteoData::P); //is in mbar
	if (p != IOUtils::nodata)
		p *= 100.;
778
779
}

780
781
void PSQLIO::open_connection()
{
782
	const string connect = "hostaddr = '" + endpoint +
783
784
785
786
787
788
789
790
791
792
793
794
		"' port = '" + port +
		"' dbname = '" + dbname +
		"' user = '" + userid +
		"' password = '" + passwd +
		"' connect_timeout = '10'";

	psql = PQconnectdb(connect.c_str());

	if (!psql) {
		throw IOException("PSQLIO connection error: PQconnectdb returned NULL", AT);
	}
	if (PQstatus(psql) != CONNECTION_OK) {
795
		cerr << "ERROR" << PQstatus(psql) << endl;
796
797
798
799
		throw IOException("PSQLIO connection error: PQstatus(psql) != CONNECTION_OK", AT);
	}
}

800
PGresult *PSQLIO::sql_exec(const string& sql_command)
801
802
803
804
805
806
807
808
809
810
811
812
813
{
	open_connection();

	PGresult *result = PQexec(psql, sql_command.c_str());
	ExecStatusType status = PQresultStatus(result);
	if (status == PGRES_TUPLES_OK) { //Successful completion of a SELECT data request
		// cout << "Select executed normally... " << endl;

		// PQprintOpt        options = {0};
		// options.header    = 1;    /* Ask for column headers            */
		// options.align     = 1;    /* Pad short columns for alignment   */
		// options.fieldSep  = "|";  /* Use a pipe as the field separator */
		// PQprint(stdout, result, &options);
814
815
816
	} else if (status == PGRES_COMMAND_OK) {
		// other command like insert executed
		//cout << "Successful completion of a command returning no data." << endl;
817
	} else {
818
		cout << "ERROR while executing the following sql statement: " << sql_command << endl;
819
820
821
822
823
824
825
826
827
828
829
830
831
832
		//cout << "BAD SELECT: " << PQresStatus(status) << endl;
		PQclear(result);
		return NULL;
	}

	close_connection(psql);
	return result;
}

void PSQLIO::close_connection(PGconn *conn)
{
    PQfinish(conn);
}

833
} //namespace