-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsensor_reader.py
More file actions
executable file
·77 lines (65 loc) · 2.39 KB
/
sensor_reader.py
File metadata and controls
executable file
·77 lines (65 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python
# Example for RC timing reading for Raspberry Pi
# Must be used with GPIO 0.3.1a or later - earlier verions
# are not fast enough!
from __future__ import division
from threading import Thread
from model.reading import Reading
import time
import RPi.GPIO as GPIO
import queue
import logging
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', )
DEBUG = 0
GPIO.setmode(GPIO.BCM)
class SensorReader(Thread):
RCpin = 0
threshold = 0.0
maxLength = 3
readingLimit = 15000
readings = [readingLimit] * maxLength
readingsSum = readingLimit * maxLength
readingIdx = 0
isBlinking = False
def __init__(self, pin, threshold, name):
super(SensorReader, self).__init__(name=name)
self.RCpin = pin
self.threshold = threshold
def saveReading(self, reading):
# remove old reading from array
self.readingsSum -= self.readings[self.readingIdx]
# overwrite old reading with new
self.readings[self.readingIdx] = reading
# add new reading to sum
self.readingsSum += self.readings[self.readingIdx]
if self.readingIdx == self.maxLength - 1:
# reset index if reached max
self.readingIdx = 0
else:
# increase index by one
self.readingIdx += 1
def readSensor(self):
reading = 0
GPIO.setup(self.RCpin, GPIO.OUT)
GPIO.output(self.RCpin, GPIO.LOW)
time.sleep(0.1)
GPIO.setup(self.RCpin, GPIO.IN)
# This takes about 1 millisecond per loop cycle
while GPIO.input(self.RCpin) == GPIO.LOW and reading < self.readingLimit:
reading += 1
return reading
def run(self):
while True:
reading = self.readSensor()
avgReading = (self.readingsSum / self.maxLength)
readingRatio = avgReading / reading
# logging.debug("New reading: %d, avgReading = %.2f, readingRatio = %.2f" %(reading, avgReading, readingRatio))
if readingRatio > self.threshold:
if not self.isBlinking:
logging.debug("Blink!")
readingItem = Reading(reading, readingRatio)
queue.put(readingItem)
self.isBlinking = True
else:
self.isBlinking = False
self.saveReading(reading)