我就废话不多说了,大家还是直接看代码吧!
# -*- coding:utf-8 -*- # file: ceshitianqi import urllib2 import json import time import datetime import serial import random import os import sys apikey = 'zpdlyl***=' #改成你的apikey ser=serial.serial("/dev/ttyusb2",9600,timeout=1) def read(key): ser.write(key) print("output:" key) time.sleep(1) response = ser.readall() print(response) print(type(response)) return response def http_put(key): val = read(key) #获取arduino的数据 curtime = datetime.datetime.now() url='http://api.heclouds.com/devices/**1/datapoints' #values={'datastreams':[{"id":"temp","datapoints":[{"at":curtime.isoformat(),"value":val}]}]} print(type(val)) if key== "a" : values={'datastreams':[{"id":"humidity","datapoints":[{"at":curtime.isoformat(),"value":val}]}]} if key== "b" : values={'datastreams':[{"id":"temperature","datapoints":[{"at":curtime.isoformat(),"value":val}]}]} jdata = json.dumps(values) # 对数据进行json格式化编码 #打印json内容 print jdata request = urllib2.request(url, jdata) request.add_header('api-key', apikey) request.get_method = lambda:'post' # 设置http的访问方式 request = urllib2.urlopen(request) return request.read() str = ("a","b") while true: for i in str: f = open('1.txt') e = f.read() if e == "1n": ser.write("c") if e == "0n": ser.write("d") f.close() resp = http_put(i) time.sleep(2)
轮询1.txt
1则点亮
0则关闭
补充知识:python笔记(轮询、长轮询)
一、轮询
views.py
from flask import flask,render_template,request,jsonify app = flask(__name__) users = { '1':{'name':'贝贝','count':1}, '2':{'name':'小东北','count':0}, '3':{'name':'何伟明','count':0}, } @app.route('/user/list') def user_list(): import time return render_template('user_list.html',users=users) @app.route('/vote',methods=['post']) def vote(): uid = request.form.get('uid') users[uid]['count'] = 1 return "投票成功" @app.route('/get/vote',methods=['get']) def get_vote(): return jsonify(users) if __name__ == '__main__': app.run(threaded=true)
html
title
-
{% for key,val in users.items() %}
- {{val.name}} ({{val.count}}) {% endfor %}
二、长轮询
views.py
from flask import flask,render_template,request,jsonify,session import uuid import queue app = flask(__name__) app.secret_key = 'asdfasdfasd' users = { '1':{'name':'贝贝','count':1}, '2':{'name':'小东北','count':0}, '3':{'name':'何伟明','count':0}, } queque_dict = { } @app.route('/user/list') def user_list(): user_uuid = str(uuid.uuid4()) queque_dict[user_uuid] = queue.queue() session['current_user_uuid'] = user_uuid return render_template('user_list.html',users=users) @app.route('/vote',methods=['post']) def vote(): uid = request.form.get('uid') users[uid]['count'] = 1 for q in queque_dict.values(): q.put(users) return "投票成功" @app.route('/get/vote',methods=['get']) def get_vote(): user_uuid = session['current_user_uuid'] q = queque_dict[user_uuid] ret = {'status':true,'data':none} try: users = q.get(timeout=5) ret['data'] = users except queue.empty: ret['status'] = false return jsonify(ret) if __name__ == '__main__': app.run(threaded=true)
html
title
-
{% for key,val in users.items() %}
- {{val.name}} ({{val.count}}) {% endfor %}