Python / Datenbank / Querx Beispiel



  • Da der Querx nur begrenzten Speicher hat behält der nur eine gewisse Anzahl an Messungen. Ältere Messungen werden automatisch gelöscht. Mit einem Aufzeichnungsintervall von 5min und aktivierter Kompression komme ich so auf respektable rund 300 Tage.

    Ich will die Daten aber länger speichern und für den schnellen Zugriff in einer SQL-Datenban ablegen (MariaDB unter WSL in meinem Fall).

    Ich habe 1 Querx THP und daher eine DB Namens "phys_data" mit einer Tabelle namens "ClimateData" angelegt mit folgenden Feldern:

    Sensor [TINYTEXT], Time {DATETIME], TemperatureLow [FLOAT], TemperatureAvg [FLOAT], TemperatureHigh [FLOAT], HumidityLow [FLOAT], HumidityAvg [FLOAT], HumidityHigh [FLOAT], PressureLow [FLOAT], PressureAvg [FLOAT], PressureHigh [FLOAT]
    

    Mein Script ist sehr einfach gestrickt und basiert auf der Annahme, dass alle Querxe dieselbe Anzahl Datenfelder haben wie die DB. Es funktioniert auch nur, solange nur ein Querx genutzt wird.

    Das Script sucht in der DB nach dem neuesten Eintrag (egal von welchem Querx). Anschliessend lädt es per Web-API die Daten ab diesem Zeitpunkt in 5min Schritten vom Qerx im XML-Format herunter. Der erste Eintrag wird ignoriert, da er ja mit dem letzten Eintrag in der DB übereinstimmt.

    Diese XML-Daten werden auf Naive Art und Weise geparst und in SQL-Statements verpackt, die dann als Queries an die DB geschickt werden.

    Damit kann ich einfach ab und zu (z.B. einmal pro Tag) die neuesten Daten vom Querx in die DB laden.

    #!/usr/bin/python3
    
    # by Marco Tedaldi, 2021
    # published as public domain except for the parts that are under copyright by someone else (ie the parts copied from egnite example code)
    
    # transfer the data from the querx sesor into the mariadb / mysql-DB
    # procedure:
    # - get the time of the latest data in the database as a Unix Timestamp
    # - forge a ruequest string for the querx http interface and get the data in xml format
    # - convert the data from xml to SQL and store it in the SQL DB
    
    
    import sys
    from MySQLdb import _mysql as mysql  # we want to access a mysql / mariadb
    import http.client # to get the data from the querx
    import xml.etree.ElementTree as ET # to parse the XML from the querx
    from datetime import datetime # to conver the timestamps to datetime for the DB
    
    # access information for the Querx (copied from https://www.egnite.de/support/tutorials/tutorial-zugriff-auf-querx-messdaten-mit-python/)
    querx_address = "192.88.99.2" #name or address of the Querx Sensor
    querx_http_port = 80
    xml_url = "/tpl/document.cgi?tpl/j/datalogger.tpl&format=xml"
    
    # databse access information
    db_address = "localhost"
    db_user = "username"
    db_pass = "VerySecurePW"
    db_name = "phys_data"
    db_table = "ClimateData"
    
    # function to get the timestamp of the last entry
    def get_lastTS(db):
      db.query("""SELECT UNIX_TIMESTAMP(MAX(Time)) FROM ClimateData""")
      result = db.store_result().fetch_row()
      ts = int(result[0][0])
      return(ts)
    
    # copied from https://www.egnite.de/support/tutorials/tutorial-zugriff-auf-querx-messdaten-mit-python/
    # adapted to get historical values
    def get_xml(address, port, url):
      try:
        conn = http.client.HTTPConnection(address, port) # Python 3.x: httplib => http.client
        conn.request("GET", url)
        res = conn.getresponse()
        xml = res.read()
        return(xml)
      except Exception as e:
         print ("Error: " + str(e))
         sys.exit(1)
    
    # main program
    if __name__ == "__main__":
      db = mysql.connect( db_address, db_user, db_pass, db_name)
      lastdata = get_lastTS(db)
      print("Timestamp = " + str(lastdata))
      xml_url = xml_url + "&start=" + str(lastdata) + "&step=300"
      print("xml_url = " + xml_url)
      climatedata = get_xml(querx_address, querx_http_port, xml_url)
      root = ET.fromstring(climatedata)
      sensorname = root.findtext('hostname')
      data = root.find('data')
      for rnr, record in enumerate(data):
        if rnr != 0:
          time = datetime.fromtimestamp(int(record.get('timestamp'))).strftime('%Y-%m-%d %H:%M:%S')
          query = 'INSERT INTO ' + str(db_table) + ' VALUES (\'' + str(sensorname) + '\', \'' + time
          for idx, entry in enumerate(record):
            query = query + '\', \'' + str(entry.get('value'))
          query = query + '\' );'
          print('-> ' + query)
          db.query(query)
      db.close()
    

    Ich bin kein Programmierer sondern hacke mir meine Tools einfach so zusammen, dass sie für mich funktionieren. Diese Software könnte von einem fähigen Programmierer in kurzer Zeit wahrscheinlich massiv verbessert werden um mit Kommandozeilen-Argumenten, mehr als einem Querx und einer anderen Anzahl Spalten in der DB umgehen zu können.

    Ich veröffentliche diesen Code so wie er ist (abgesehen von den Teilen, die ich von egnite kopiert habe) zur freien Verwendung.



  • Hallo Marco,
    das sieht sehr gut aus.
    Vielen Dank fürs Teilen!


  • Administrators

    Hallo Marco,

    ich hätte noch ein paar Verbesserungsvorschläge:

    Aktuell wird der Timestamp in Python in einen String in Lokalzeit umgewandelt. Danach wird er von der Datenbank in DATETIME umgewandelt. Das könnte zu Problemen führen, wenn es eine Zeitumstellung gibt oder Datenbank und Python mit unterschiedlichen Zeitzonen arbeiten. Bei einer Zeitumstellung könnten dann z.B. Daten für eine Stunde verloren gehen. Eine mögliche Lösung wäre es, wenn man den Timestamp direkt als Zahl speichert. Man könnte auch die Zeitzone von Datenbank und Python auf UTC einstellen.

    Aus Sicherheitsgründen wäre es besser, wenn man die SQL-Anfrage nicht komplett als String baut. Wenn sich ein Angreifer als Querx ausgeben kann, dann könnte er z.B. in einem Sensorwert einen SQL-Befehl zum Löschen der Tabelle einschleusen. Stattdessen gibt es normalerweise Funktionen, um in der SQL-Anfrage Platzhalter zu verwenden und die Daten getrennt zu übergeben.



  • @tim Oh, das sind gute Einwände! Danke.
    Die Timestamp im xml scheint in UTC zu sein was die Verarbeitung eigentich sehr einfach machen sollte. Einfach statt datetime.fromstimestamp() durch datetime.utcfromtimestamp() verwenden... allerdings muss ich dann alle Daten aus der Vergangenheit (die aus anderen Backups stammen) ich noch konvertieren...
    Der Aufwand dürfte sich lohnen um zukünftigen Problemen aus dem Weg zu gehen... Am Besten wohl gleich in der DB selbst Timestamp speichern.
    Es wird wohl Zeit für Version 0.2 (die dann wohl auch gar keine Abfrage macht, wenn der neueste Record in der DB weniger als timestep sekunden in der Verganenheit liegt.

    Das mit der Sicherheit stimmt natürlich. Und auch wenn der Zeitgewinn nicht gross sein dürfte, könnte das Zusammenfassen der Anfragen mit Library-Funktionen die Anzahl der Requests auf dem DB-Server u.U. deutlich Reduzieren.
    Andererseits ist soweit ich weiss schon der XML-Parser eigentlich nur für vertrauenswürdiges XML geeignet. D.h. wenn ich das Thema "Sicherheit" ernst nehmen will, muss ich auch dort vorkehrungen treffen.


  • Administrators

    @Marco-T Ja, der Timestamp ist in UTC. Die anderen Attribute date und time sind in Lokalzeit des Querx.



  • Danke an @tim für die inputs. Ich habe etwas weiter gearbeitet und eine (hoffentlich) verbesserte Version erstellt.

    Die Zeit wird nun in UTC in der DB abgelegt und die Daten werden in sichererer (und schnellerer) weise in der DB gespeichert (wobei das Speichern in der DB schon vorher nicht den grössten Teil der Zeit beanspruchte).
    Die neue Version nimmt die Ausführungszeiten der einzelnen Arbeitsschritte auf und zeigt diese am Ende an.

    #!/usr/bin/python3
    
    
    # transfer the data from the querx sesor into the mariadb / mysql-DB
    # procedure:
    # - get the time of the latest data in the database as a Unix Timestamp
    # - forge a request string for the querx http interface and get the data in xml format
    # - convert the data from xml to SQL and store it in the SQL DB
    #
    # by Marco Tedaldi, m.tedaldi aequator.ch 12.2021
    # 20220113 MTE time is stored in UTC in DB
    # 20220112 MTE switched from using mysql to mariadb
    #              split parsing and storing into two separate actions
    #              using .executemany to store all data in one go instead of using many separate SQL statements
    # 20220104 MTE added collection and display of timing information
    
    
    import sys
    import mariadb # mariadb
    import http.client  # to get the data from the querx
    import xml.etree.ElementTree as ET  # to parse the XML from the querx
    from datetime import datetime  # to conver the timestamps to datetime for the DB
    import time  # to measure execution time
    import math
    
    # access information for the Querx (copied from https://www.egnite.de/support/tutorials/tutorial-zugriff-auf-querx-messdaten-mit-python/)
    querx_address = "192.0.2.24"  # name or address of the Querx Sensor
    querx_http_port = 80
    xml_url = "/tpl/document.cgi?tpl/j/datalogger.tpl&format=xml"
    
    # databse access information
    conn_params= {
        "host" : "localhost",
        "user" : "username",
        "password" : "verysecurepassword",
        "database" : "phys_data",
    }
    db_table = "ClimateData"
    
    #copied from https://stackoverflow.com/questions/17973278/python-decimal-engineering-notation-for-mili-10e-3-and-micro-10e-6
    # function to display values in engineering or technical notation
    def eng_string(x, format='%s', si=False):
        '''
        Returns float/int value <x> formatted in a simplified engineering format -
        using an exponent that is a multiple of 3.
    
        format: printf-style string used to format the value before the exponent.
    
        si: if true, use SI suffix for exponent, e.g. k instead of e3, n instead of
        e-9 etc.
    
        E.g. with format='%.2f':
            1.23e-08 => 12.30e-9
                 123 => 123.00
              1230.0 => 1.23e3
          -1230000.0 => -1.23e6
    
        and with si=True:
              1230.0 => 1.23k
          -1230000.0 => -1.23M
        '''
        sign = ''
        if x < 0:
            x = -x
            sign = '-'
        exp = int(math.floor(math.log10(x)))
        exp3 = exp - (exp % 3)
        x3 = round(x / (10 ** exp3), 2)
    
        if si and exp3 >= -24 and exp3 <= 24 and exp3 != 0:
            exp3_text = 'yzafpnum kMGTPEZY'[exp3 // 3 + 8]
        elif exp3 == 0:
            exp3_text = ''
        else:
            exp3_text = 'e%s' % exp3
    
        return ('%s'+format+'%s') % (sign, x3, exp3_text)
    
    
    # function to get the timestamp of the last entry
    def get_lastTS(cursor):
        try:
            cursor.execute("SELECT UNIX_TIMESTAMP(MAX(Time)) FROM ClimateData")
            result = cursor.fetchone()
            ts = int(result[0])
        except:
            print("could not get timestamp from database, using 1.1.1970 00:00")
            ts = 0
        return(ts)
    
    
    # copied from https://www.egnite.de/support/tutorials/tutorial-zugriff-auf-querx-messdaten-mit-python/
    # adapted to get historical values
    def get_xml(address, port, url):
        try:
            # Python 3.x: httplib => http.client
            conn = http.client.HTTPConnection(address, port)
            conn.request("GET", url)
            res = conn.getresponse()
            xml = res.read()
            return(xml)
        except Exception as e:
            print("Error: " + str(e))
            sys.exit(1)
    
    
    # main program
    if __name__ == "__main__":
        runtime = [time.monotonic()]
        print('connecting to SQL database')
        db = mariadb.connect(**conn_params)
        cursor = db.cursor()
        runtime.append(time.monotonic())
        print('connected to SQL database')
        print('getting date and time of last entry')
        lastdata = get_lastTS(cursor)
        runtime.append(time.monotonic())
        date = datetime.fromtimestamp(lastdata).strftime('%d.%m.%Y %H:%M:%S')
        print('timestamp of last data: ' + str(lastdata) + " (" + date + ")")
        xml_url = xml_url + "&start=" + str(lastdata) + "&step=300"
        print("Getting data from http://" + querx_address + xml_url)
        climatedata = get_xml(querx_address, querx_http_port, xml_url)
        runtime.append(time.monotonic())
        print("Data received, now starting to parse")
        root = ET.fromstring(climatedata)
        sensorname = root.findtext('hostname')
        data = root.find('data')
        print("Number of records = " + str(len(data) - 1))
        datalist=[]
        query = 'INSERT INTO ' + str(db_table) + ' VALUES (?,?'
        for rnr, record in enumerate(data):
            if rnr == 0:
                for entry in record:
                    query = query + ',?'
                query = query + ')'
            else:
                datatuple = ()
                dtime = datetime.utcfromtimestamp(int(record.get('timestamp'))).strftime('%Y-%m-%d %H:%M:%S')
                row = [ str(sensorname), dtime]
                for entry in record:
                    row.append(str(entry.get('value')))
                datatuple = tuple(row)
                datalist.append(datatuple)
                print('Records processed: ' + str(rnr), end='\r')
        runtime.append(time.monotonic())
        print("\nWriting data do DB")
        cursor.executemany(query, datalist)
        db.commit()
        runtime.append(time.monotonic())
        cursor.close()
        db.close()
        runtime.append(time.monotonic())
        print('\n')
        print("Runtimes:")
        print("Total Runtime = " +
              eng_string(runtime[5]-runtime[0], si=True) + "s")
        print("Connecting to DB took " +
              eng_string(runtime[1]-runtime[0], si=True) + "s")
        print("Getting Timestamp of last entry took " +
              eng_string(runtime[2]-runtime[1], si=True) + "s")
        print("Getting Data from Querx took " +
              eng_string(runtime[3]-runtime[2], si=True) + "s")
        print("Parsing and converting the data took " +
              eng_string(runtime[4]-runtime[3], si=True) + "s")
        print("Storing the data to DB took " +
              eng_string(runtime[5]-runtime[4], si=True) + "s")
        print("Disconnecting from DB took " +
              eng_string(runtime[6]-runtime[5], si=True) + "s")
    
    


  • Daten aus der SQL-DB ziehen ist ganz einfach... Mit der Zeit wird die Datenmenge aber erheblich. Aber wir können den SQL-Server für uns arbeiten lassen.
    Wenn ich z.B. nur einen Wert pro Tag abfragen will ermittelt dieses Snippet den Tagesdurchschnitt sowie die Minimal- und Maximalwerte.

    SELECT TIME, round(avg(PressureAvg), 1) AS 'avg pressure', MAX(PressureHigh) AS 'max pressure', MIN(PressureLow) AS 'min pressure'
        FROM ClimateData
        GROUP BY CAST(Time AS DATE)
    

    Aber vielleicht will man wissen, wie schnell sich ein Wert geändert hat. Hier kommt die Funktion "LAG()" wie gerufen.

    WITH pressure_lag AS (SELECT Time, PressureAvg,
      LAG(PressureAVG, 11) OVER (ORDER BY Time) AS Pressure1h
      FROM ClimateData)
      SELECT *, COALESCE(ROUND(PressureAVG - Pressure1h,1)) AS Pdiff FROM pressure_lag
      ORDER BY Pdiff DESC
    

    In diesem Beispiel wird eine temporäre Tabelle angelegt in welcher die Zeit, der Luftdruckwert sowie der Luftdruckwert von 11 zeilen davor (1h früher) als Spalten enthalten sind. Der 2. Teil rechnet dann pro Zeile die Differenz aus und gibt das Ergebnis absteigend sortiert aus. So bekommt man die höchsten Werte zuerst angezeigt. Will man die kleinsten Werte zuerst sehen, lässt man einfach das "DESC" am Ende weg.