Palaa kohteeseen Home automation

Measurement automation scripts and codes.

STATUS DISPLAY

from inky import InkyPHAT 
from PIL import Image, ImageFont, ImageDraw 
from font_fredoka_one import FredokaOne 
import datetime 
import socket 
import requests 
import psutil 
from time import sleep 
import os
from gpiozero import CPUTemperature

y1 = 0 
y2 = 20 
y3 = 60
y4 = 80

#client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP 

inky_display = InkyPHAT("black")
inky_display.set_border(inky_display.WHITE)

font1 = ImageFont.truetype(FredokaOne, 20)
font2 = ImageFont.truetype(FredokaOne, 20)

#client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 
#client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 
#client.bind(("", 37020)) 

while True:
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP
    client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    client.bind(("", 37020))
    try:
        CPU_LOAD=str(int(psutil.cpu_percent()))
    except:
        CPU_LOAD="nan"
    try: 
        cpu=CPUTemperature() 
    except: 
        cpu="nan" 
    try: 
        CPU_TEMP=str(int(cpu.temperature)) 
    except: 
        CPU=TEMP="nan" 
    try: 
        if requests.get('https://google.com').ok:
            network_status="UP" 
    except:
        network_status="DN"
    try: 
        client.settimeout(10) 
        data, addr = client.recvfrom(1024) 
        msg=str(data)[2:-1] 
        values=msg.split(',')
        print(addr) 
        print(values[0]+":"+values[1]) 
        print(values[2]+":"+values[3])
        Workroom=values[1]
        Outdoor=values[3]
    except: 
        print("timeout") 
        Workroom="nan"
        Outdoor="nan"
    currentDT = datetime.datetime.now() 
    img = Image.new("P", (inky_display.WIDTH, inky_display.HEIGHT)) 
    draw = ImageDraw.Draw(img) 
    hour=str(currentDT.hour) 
    minute=str(currentDT.minute) 
    year=str(currentDT.year) 
    month=str(currentDT.month) 
    day=str(currentDT.day) 
    try:
        cpu=CPUTemperature() 
    except: 
        cpu="nan" 
    try: 
        testi=6 
    except: 
        print("vituixman") 
    status1=Workroom+"C "+Outdoor+"C"
#    print(status1)
    status2=CPU_LOAD+"% "+CPU_TEMP+"C "+network_status
#    print(status2)
    date=currentDT.strftime("%d.%m.%Y") 
    time=currentDT.strftime("%H:%M") 
    w, h = font1.getsize(date) 
    x1 = (inky_display.WIDTH / 2) - (w / 2) 
    w, h = font1.getsize(time) 
    x2 = (inky_display.WIDTH / 2) - (w / 2) 
    w, h = font1.getsize(status1) 
    x3 = (inky_display.WIDTH / 2) - (w / 2) 
    w, h = font1.getsize(status2) 
    x4 = (inky_display.WIDTH / 2) - (w / 2) 
    draw.text((x1, y1), date, inky_display.BLACK, font1) 
    draw.text((x2, y2), time, inky_display.BLACK, font1) 
    draw.text((x3, y3), status1, inky_display.BLACK, font1) 
    draw.text((x4, y4), status2, inky_display.BLACK, font1) 
    inky_display.set_image(img) 
    try:
        inky_display.show() 
    except: 
        print("paskaksmän")
    sleep(60)

RUUVITAG READING CODE

import signal
import time
import os
import sys
import subprocess

from datetime import datetime

from ruuvitag_sensor.ruuvitag import RuuviTag

#function that has ruuvitag mac address and room name as input. Function gathers data from ruuvitags, and write values to files. 
def ReadTag(MAC,Room):
    sensor=RuuviTag(MAC)
    try:
        data=sensor.update() #update sensor data
        print(data)
    except: #if update fails write to error log file
        ff = open("/automaatio2/ruuvi_error", "a")
        ff.write("Sensors_failed\n")
        ff.close()
    try: #read values from ruuvitag
        line_sen = str(MAC)
        #print(line_sen)
        line_tem = str(data['temperature'])
        #print(line_tem)
        line_hum = str(data['humidity'])
        #print(line_hum)
        line_pre = str(data['pressure'])
        #print(line_pre)
        line_bat = str(data['battery'])
        #print(line_bat)
        file = open('/automaatio2/%s.values' %Room , 'w') #open file to write data to
        file.write(line_sen)
        file.write('\n')
        file.write(line_tem)
        file.write('\n')
        file.write(line_hum)
        file.write('\n')
        file.write(line_pre)
        file.write('\n')
        file.write(line_bat)
        file.close()
        ff = open("/automaatio2/ruuvi_error", "a")
        ff.write(Room)
        ff.close()
        print("jöö")
    except: #if ruuvi read fails write to error log. 
        print("pöö")
        ff = open("/automaatio2/ruuvi_error", "a")
        ff.write(Room + " crashed")
        ff.close()


NumberOfSensors = 6
#mac addresses of ruuvitags
mac1 = 'FC:E2:49:DB:D9:01'#Livingroom**
mac2 = 'F6:46:1D:B4:01:18'#Adhoc**
mac3 = 'D8:06:3F:50:87:99'#BedroomA**
mac4 = 'F0:25:6B:CE:A6:51'#Laundryroom**
mac5 = 'EC:85:7E:11:E0:73'#Kitchen**
mac6 = 'FD:69:92:41:8E:2A'#Guestroom
MacList = [mac1,mac2,mac3,mac4,mac5,mac6] #make list of mac addresses

#rooms linked to mac addresses
room1="livingroom"
room2="adhoc"
room3="bedroomA"
room4="laundryroom"
room5="kitchen"
room6="guestroom"

RoomList = [room1,room2,room3,room4,room5,room6] #make list from room names

time.sleep(600) #on raspi start wait for a while. 


while True:
    #just open error log file and write start
    f = open(os.devnull, 'w')
    ff = open("/automaatio2/ruuvi_error", "a")
    ff.write("Start")
    ff.close()
    for Number in range(NumberOfSensors): #loop through ruuvi sensors
        ReadTag(MacList[Number],RoomList[Number])  #call function
    time.sleep(30)
    os.system("sudo killall hcitool") #because it seems that hcitools wont quit and eventually raspi is exhausted of amount of hcitools, we kill all hcitool processes. 

Simple 28BYJ-48 stepper motor code

import RPi.GPIO as GPIO
import time
def init():
    GPIO.setmode(GPIO.BOARD)
    global control_pins
    control_pins = [7,11,13,15]
    for pin in control_pins:
        GPIO.setup(pin, GPIO.OUT)
        GPIO.output(pin, 0)
global halfstep_seq_ccw
halfstep_seq_ccw = [
  [1,0,0,0],
  [1,1,0,0],
  [0,1,0,0],
  [0,1,1,0],
  [0,0,1,0],
  [0,0,1,1],
  [0,0,0,1],
  [1,0,0,1]
]

global halfstep_seq_cw
halfstep_seq_cw = [
  [1,0,0,1],
  [0,0,0,1],
  [0,0,1,1],
  [0,0,1,0],
  [0,1,1,0],
  [0,1,0,0],
  [1,1,0,0],
  [1,0,0,0]
]

def ccw(steps,delay):
    halfstep_seq_ccw = [
      [1,0,0,0],
      [1,1,0,0],
      [0,1,0,0],
      [0,1,1,0],
      [0,0,1,0],
      [0,0,1,1],
      [0,0,0,1],
      [1,0,0,1]
    ]
    for i in range(steps):
        for halfstep in range(8):
            for pin in range(4):
              GPIO.output(control_pins[pin], halfstep_seq_ccw[halfstep][pin])
            time.sleep(delay)
def cw(steps,delay):
    halfstep_seq_cw = [
      [1,0,0,1],
      [0,0,0,1],
      [0,0,1,1],
      [0,0,1,0],
      [0,1,1,0],
      [0,1,0,0],
      [1,1,0,0],
      [1,0,0,0]
    ]
    for i in range(steps):
      for halfstep in range(8):
        for pin in range(4):
          GPIO.output(control_pins[pin], halfstep_seq_cw[halfstep][pin])
        time.sleep(delay)
def buzz(steps,delay):
    for i in range(steps):
        for halfstep in range(8):
            for pin in range(4):
                GPIO.output(control_pins[pin], halfstep_seq_cw[halfstep][pin])
                time.sleep(delay)
    for i in range(steps):
        for halfstep in range(8):
            for pin in range(4):
                GPIO.output(control_pins[pin], halfstep_seq_ccw[halfstep][pin])
                time.sleep(delay)
if __name__ == '__main__':
#    init()
    while True:
        init()
        direction=input("ccw/cw:    ")
        delay = input("Time Delay (ms)?    ")
        steps = input("How many steps?    ")
        if direction in ['ccw']:
            ccw(int(steps), int(delay)/1000)
        if direction in ['cw']:
            cw(int(steps),int(delay)/1000)
#        buzz(3,1/10000)
#        steps = input("How many steps cw? ")
#        cw(int(delay), int(steps))
GPIO.cleanup()