Mini Projekt – #1 – Jak wykonać prostą automatyzację w Selenium i zintegrować ją z elektroniką?

Wprowadzenie.

We wpisie przedstawiłem, w jaki sposób wykonać prostą automatyzację, gdzie przy pomocy Selenium WebDrivera, Pythona, Raspberry Pi oraz kilku gotowych układów elektronicznych można zrealizować monitorowanie na stronie stanu magazynowego produktu i od otrzymanych danych zrealizować jakąś akcję.

Zakres artykułu.

  • Ogólny zarys projektu
  • Pobranie danych ze strony internetowej przy pomocy Selenium WebDriver
  • Przesłanie danych do Raspberry Pi
  • Wysterowanie pinu na Raspberry Pi
  • Źródła

Ogólny zarys projektu

Założenia projektowe:

  • pobieranie danych ze strony internetowej odnośnie stanu magazynowego produktu,
  • przesłanie danych do Raspberry Pi z wykorzystaniem protokołu UDP,
  • przetworzenie danych i wysterowanie dowolnego układu elektronicznego, którego działenie będzie uzależnione od otrzymanych danych

Pobranie danych ze strony internetowej przy pomocy Selenium WebDriver

Projekt rozpoczniemy od pobrania danych ze strony internetowej. Jako cel obrałem monitorowanie stanu konwertera poziomów logicznych 3,3V / 5V 4-kanałowy, który znajduje się pod następującym linkiem  https://sklep.msalamon.pl/produkt/konwerter-poziomow-logicznych-33v-5v-4-kanalowy/ [1]. Zaznaczę, że między innymi tego konwertera użyjemy w późniejszym etapie projektu.

Dane ze strony internetowej pobierzemy przy pomocy Selenium WebDriver, o którym wspominałem już wcześniej na blogu. Jeżeli chcesz dowiedzieć się podstawach informacji o Selenium, zapraszam Cię pod ten link [2], a jeżeli chcesz zainstalować odpowiednie oprogramowanie, to proponuję przeczytać ten wpis [3].

Znając nasz cel, przechodzimy pod nasz adres URL i otwieramy narzędzia developerskie przyciskiem F12 w przypadku przeglądarki Firefox czy Chrome. Z narzędzi developerskich możemy użyć opcji “Inspector”, gdzie zobaczymy kod strony a następnie narzędzia “inspect”, przy pomocy którego odszukujemy linijkę kodu, która będzie nas interesowała. Poniżaj, zamieszczam zrzut ekranu z danymi, które wykorzystam w moim kodzie.

Po kodzie, który znaleźliśmy, możemy z dużym prawdopodobieństwem stwierdzić, że do szukania naszych danych wystarczy wykorzystać klasę css o nazwie “in-stock“. 

W chwili obecnej mamy już wszystkie dane, które nas interesują i możemy przejść do pisania kodu. 

W pierwszym kroku z modułu selenium musimy zaimportować klasę webdriver oraz z modułu by klasę By. Następnie należy utworzyć instancję klasy webdriver.Firefox(), która pozwoli nam na przeprowadzenie interakcji z przeglądarką. Warto następnie ustawić czas oczekiwania na elementy, ponieważ może zdarzyć się, że mogą być ładowane z opóźnieniem. 

W kolejnym kroku możemy zmaksymalizować okno przeglądarki, a następnie przejść pod nasz docelowy adres URL. Gdy już będziemy na stronie, tworzymy tak zwany lokalizator elementu, który będzie typu tuple i w niej umieszczamy dwa argumenty. Pierwszy argument informować będzie metodę, że chcemy szukać elementu po nazwie klasy, a drugi argument zawierać będzie, konkretną nazwę, którą chcemy znaleźć. W kolejnym kroku wykorzystujemy metodę find_element, w której rozpakowujemy krotkę i wykonujemy na niej właściwość text odpowiedzialną za pobranie tekstu z odnalezionego elementu na stronie.

Gdy już pobraliśmy nasz tekst, możemy zauważyć, że zawiera on więcej danych, niż potrzebujemy, dlatego dodatkowo przy pomocy wyrażeń regularnych możemy wydobyć z naszego stringu jedynie dane, które będą nas interesować. W celu weryfikacji, czy wszystko działa poprawnie dane, które wydobyliśmy, możemy wyświetlić w terminalu. Na koniec jeszcze należy po sobie posprzątać i zamknąć przeglądarkę metodą quite().

Kod takiego programu prezentuje się następująco.

main.py

from selenium import webdriver
from selenium.webdriver.common.by import By
import re

driver = webdriver.Firefox()
driver.implicitly_wait(10)
driver.maximize_window()
driver.get("https://sklep.msalamon.pl/produkt/konwerter-poziomow-logicznych-33v-5v-4-kanalowy/")

in_stock_label = (By.CLASS_NAME, 'in-stock')
in_stock = driver.find_element(*in_stock_label).text

numbers = re.findall(r'\d+', in_stock)

print(f'{numbers[0]}')

driver.quit()

Przesłanie danych do Raspberry Pi

Gdy już mamy przygotowane dane, które chcemy wysłać, musimy napisać program a właściwie dwa programu, które obsłużą nam komunikację. Pierwszy program będzie odbierał dane “receiver.py” i należy umieścić go na Raspberry Pi,  drugi natomiast, będzie wysyłał dane “sender.py“, dlatego umieszczamy go na komputerze.

W celu obsłużenia komunikacji po UDP, należy zaimportować moduł socket. Z modułu socket następnie należy utworzyć instancję klasy, o tej samej nazwie, czyli socket i jako argumenty podać AF_INET odpowiedzialną za poinformowanie, że komunikacja będzie odbywać się po protokołach internetowych oraz jako drugi argument należy podać SOCK_DGRAM informujący, że zostanie użyty protokół UDP. Obie stałe dostępne są pod modułem socket. Gdy już utworzymy instancję, należy przypisać do niej adres IP oraz PORT, po którym będzie odbywała się komunikacja. Port i IP podajemy naszego Raspberry Pi, ponieważ to RPi będzie serwerem. W kolejnej części należy w pętli nasłuchiwać, czy nie pojawiły się dane w buforze w tym celu, należy użyć metody recvfrom, która przyjmuje jako argument rozmiar bufora w bajtach. Metoda ta zwraca dwie wartości, gdzie jedna to dane a druga to adres, z którego przyszły dane w postaci IP i Portu. Tak otrzymane dane są gotowe to ich obrabiania. 

Kod programu prezentuję poniżej.

receiver.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket


def send(message):
    UDP_IP = "192.168.1.16"
    UDP_PORT = 5055
    
    sock = socket.socket(socket.AF_INET,        # Internet
                         socket.SOCK_DGRAM)     # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024) # buffer size --> 1024 bytes
        print(f'Message: {str(data)}')
        print(f'Address and Port: {addr}')

Drugi program, który będzie wysyłał dane, umieszczamy na komputerze. Podobnie jak wcześniej powtarzamy kroki z zaimportowaniem modułu socket stworzeniem instancji dla klasy socket z tymi samymi argumentami, a jedyna zmiana to użycie metody sendto, która przyjmuje argument w postaci wiadomości typu byte oraz drugi argument typle, która stanowi adres IP i Port serwera, gdzie mają być dostarczone dane.

Kod programu prezentuję poniżej.

sender.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket


def send(message):
    UDP_IP = "192.168.1.16"
    UDP_PORT = 5055

    print("Target IP: %s" % UDP_IP)
    print("Target port: %s" % UDP_PORT)
    print("Message: %s" % message)

    sock = socket.socket(socket.AF_INET, # Internet
                         socket.SOCK_DGRAM) # UDP
    sock.sendto(message, (UDP_IP, UDP_PORT))

Wysterowanie pinu na Raspberry Pi

Jednym z ostatnich kroków jest stworzenie tak zwanej logiki biznesowej, gdzie otrzymane dane będę odpowiednio obrabiane. W moim przypadku stworzyłem prosty program, który ma za zadanie jedynie ustawiać pin na stan wysoki, a następnie niski po jedno sekundowym opóźnieniu.

Podzielmy jednak to zadanie na dwa kroku, gdzie w pierwszym napiszemy program, który wysteruje pin a dopiero w drugim kroku połączymy go z daną, którą otrzymaliśmy. W celu wysterowania  pinu należy zaimportować moduł RPi.GPIO, następnie należy ustawić tryb opisu pinów gpio. Ja ustawiłem go na gpio.BCM. W następnym kroku konfigurujemy pin na przykład o numerze 21, aby pełnił funkcję wyjścia i dopiero wówczas możemy przy pomocy metody output ustawić pin 21 na stan wysoki. Na koniec należy po sobie posprzątać korzystając z metody cleanup() tak, aby zwolnić zasoby sprzętowe. Przed posprzątaniem możemy jeszcze dać opóźnienie abyśmy mogli zaobserwować stan wysoki.

Taki kod programu prezentuje się następująco.

test_gpio.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import RPi.GPIO as gpio
import time


def init_gpio():
    gpio.setmode(gpio.BCM)


def main():
    init_gpio()
    gpio.setup(21, gpio.OUT)
    
    gpio.output(21, gpio.HIGH)
    time.sleep(5)

    gpio.cleanup()

if __name__ == '__main__':
    main()

Gdy przetestujemy powyższy kod możemy napisać prostą funkcję do ustawiania pinu w stan wysoki i niski po sekundowym opóźnieniu.

test_gpio.py

def blink():
    gpio.output(21, gpio.HIGH)
    time.sleep(1)
    gpio.output(21, gpio.LOW)
    time.sleep(1)

W ten oto sposób możemy przejść do etapu drugiego i połączyć obydwa programy. 

Do połączenia obsługi nasłuchującego  serwera oraz wysterowania diody możemy użyć na przykład wątków. W jednym wątku będziemy nasłuchiwać, a w drugim będziemy obsługiwać piny. Abyśmy mogli korzystać z wątków należy je zimportować moduł threading, a następnie musimy utworzyć dwa wątki. 

Przykładowa logika całego programu wygląda następująco.

receiver.py – program umieszczony na Raspberry Pi

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import RPi.GPIO as gpio
import threading
import socket
import time
import re


_global_counter = 0
_global_control = ''

def init_gpio():
    gpio.setmode(gpio.BCM)
    gpio.setup(21, gpio.OUT)


def exit_gpio():
    gpio.cleanup()


def blink():
    gpio.output(21, gpio.HIGH)
    time.sleep(1)
    gpio.output(21, gpio.LOW)
    time.sleep(1)
    

def thread_1():
    global _global_counter
    global _global_control

    init_gpio()
    while True:
        if _global_counter > 0:
            blink()
            _global_counter -= 1
            print(f'counter: {_global_counter}')
        else:
            time.sleep(1)

        if _global_control == 'quit':
            break
    exit_gpio()


def thread_2():
    global _global_counter
    global _global_control

    UDP_IP = "192.168.1.16"
    UDP_PORT = 5055
    
    sock = socket.socket(socket.AF_INET,        # Internet
                         socket.SOCK_DGRAM)     # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024) # buffer size --> 1024 bytes
        print(f'Message: {str(data)}')
        print(f'Address and Port: {addr}')

        if data == b'quit':
            print('Bye bye!')
            _global_control = 'quit'
            break

        numbers = re.findall(r'\d+', str(data))
        if isinstance(numbers, list) and numbers:
            print(f'{numbers[0]}')
            _global_counter = int(numbers[0])
        

def main():

    thread1 = threading.Thread(target=thread_1)
    thread2 = threading.Thread(target=thread_2)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()


if __name__ == '__main__':
    main()

Gdy już sterujemy pinem, możemy stworzyć nasz układ elektroniczny. Do ustawianego pinu możemy podłączyć konwerter napięć [1] a następnie dzięki temu, że już mamy dostęp do sterowania napięciem 5V możemy podłączyć na przykład moduł przekaźnikowy [4]. Do modułu przekaźnikowego możemy następnie podłączyć element potrzebujący jeszcze wyższego napięcia. Oczywiście do samego pinu możemy też podpiąć diodę led [5] z odpowiednim rezystorem, żebyśmy mogli już w trakcie pisania programu sprawdzać, czy wszystko działa poprawnie. Wszystkie te elementy możemy kupić w sklepie https://sklep.msalamon.pl/.

Kod programu

Żeby łatwiej było obsługiwać program na komputerze, również zastosowałem wątki, gdzie utworzyłem proste menu do sterowania programem. Po uruchomieniu programu mamy możliwość wyboru trzech opcji. 1 – odpowiedzialna za pobranie danych ze strony i wysłanie ich na serwer. 2 – wysłanie danych o stringu zawierającym jedynie “0”. 6 – wysłanie polecenia zamknięcia programu zarówno na komputerze, jak i na serwerze.

Prezentowany kod ma jedynie za zadanie pomóc w przeprowadzeniu nas przez początkowe etapy projektu i został w bardzo dużym stopniu uproszczony.

main.py – program umieszczony na komputerze

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from selenium import webdriver
from selenium.webdriver.common.by import By
import re
import threading
import time

import sender


_global_control = ''

def thread_1():
    global _global_control

    while True:
        if _global_control == '':
            time.sleep(1)
            continue

        print("1. Check stock levels.")
        print("2. Reset counter.")
        print("6. Quit")

        wybor = input("Select the option: ")

        if wybor == '1':
            print("Selected: Check stock levels.")
            _global_control = 'check_stock_levels'
        elif wybor == '2':
            print("Selected: Reset counter.")
            _global_control = 'reset_counter'
        elif wybor == '6' or wybor == 'quit':
            _global_control = 'quit'
            break
        else:
            print("Incorrect selection. Try again.")


def thread_2():
    global _global_control
    _global_control = ''

    while(True):
        time.sleep(1)

        if _global_control == 'check_stock_levels':
            try:
                driver = webdriver.Firefox()
                driver.implicitly_wait(10)
                driver.maximize_window()
                driver.get("https://sklep.msalamon.pl/produkt/konwerter-poziomow-logicznych-33v-5v-4-kanalowy/")

                in_stock_label = (By.CLASS_NAME, 'in-stock')
                in_stock = driver.find_element(*in_stock_label).text

                numbers = re.findall(r'\d+', in_stock)

                print(f'{numbers[0]}')

                sender.send(bytes(f'{numbers[0]}', 'utf-8'))

                driver.quit()
                _global_control = ''
            except:
                print('Something went wrong :/')
        elif _global_control == 'reset_counter':
            sender.send(bytes('0', 'utf-8'))
        elif _global_control == 'quit':
            print('Bye bye!')
            sender.send(bytes('quit', 'utf-8'))
            _global_control = ''
            break
        _global_control = None

    return 0


def main():

    thread1 = threading.Thread(target=thread_1)
    thread2 = threading.Thread(target=thread_2)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()


if __name__ == '__main__':
    main()

sender.py – program umieszczony na komputerze

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket


def send(message):
    UDP_IP = "192.168.1.16"
    UDP_PORT = 5055

    print("Target IP: %s" % UDP_IP)
    print("Target port: %s" % UDP_PORT)
    print("Message: %s" % message)

    sock = socket.socket(socket.AF_INET, # Internet
                         socket.SOCK_DGRAM) # UDP
    sock.sendto(message, (UDP_IP, UDP_PORT))

receiver.py – program umieszczony na Raspberry Pi

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import RPi.GPIO as gpio
import threading
import socket
import time
import re


_global_counter = 0
_global_control = ''

def init_gpio():
    gpio.setmode(gpio.BCM)
    gpio.setup(21, gpio.OUT)


def exit_gpio():
    gpio.cleanup()


def blink():
    gpio.output(21, gpio.HIGH)
    time.sleep(1)
    gpio.output(21, gpio.LOW)
    time.sleep(1)
    

def thread_1():
    global _global_counter
    global _global_control

    init_gpio()
    while True:
        if _global_counter > 0:
            blink()
            _global_counter -= 1
            print(f'counter: {_global_counter}')
        else:
            time.sleep(1)

        if _global_control == 'quit':
            break
    exit_gpio()


def thread_2():
    global _global_counter
    global _global_control

    UDP_IP = "192.168.1.16"
    UDP_PORT = 5055
    
    sock = socket.socket(socket.AF_INET,        # Internet
                         socket.SOCK_DGRAM)     # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024) # buffer size --> 1024 bytes
        print(f'Message: {str(data)}')
        print(f'Address and Port: {addr}')

        if data == b'quit':
            print('Bye bye!')
            _global_control = 'quit'
            break

        numbers = re.findall(r'\d+', str(data))
        if isinstance(numbers, list) and numbers:
            print(f'{numbers[0]}')
            _global_counter = int(numbers[0])
        

def main():

    thread1 = threading.Thread(target=thread_1)
    thread2 = threading.Thread(target=thread_2)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()


if __name__ == '__main__':
    main()

Źródła

Autor artykułu
Dominik Bednarski

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.