python定时从mysql提取数据存入redis的实现-kb88凯时官网登录

时间:2020-05-04
阅读:
免费资源网,https://freexyz.cn/

设计思路:

1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来

2.然后实例化redis类,将数据简单解析后逐条传入redis队列

3.定时器设计每天凌晨12点开始跑

ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。

 # -*- coding:utf-8 -*- 
import mysqldb
import schedule
import time
import datetime
import random
import string
import redis
# get the data from mysql
class fromsql(object):
  def __init__(self, conn):
    self.conn = conn
  def acquire(self):
    cursor = self.conn.cursor()
    try:
      sql = "select * from test where to_days(now()) - to_days(t) <= 1"
      cursor.execute(sql)
      rs = cursor.fetchall()
      #print (rs)
      for eve in rs:
        print('%s, %s, %s, %s' % eve)
      copy_rs = rs
      cursor.close()
      return copy_rs 
    except exception as e:
      print("the error: %s" % e)
class redisqueue(object):
  def __init__(self, name, namespace='queue', **redis_kwargs):
    """the default connection parameters are: host='localhost', port=6379, db=0"""
    self.__db= redis.redis(**redis_kwargs)
    self.key = '%s:%s' %(namespace, name)
  def qsize(self):
    return self.__db.llen(self.key)
  def put(self, item):
    self.__db.rpush(self.key, item)
  def get(self, block=true, timeout=none):
    if block:
      item = self.__db.blpop(self.key, timeout=timeout)
    else:
      item = self.__db.lpop(self.key)
    if item:
      item = item[1]
    return item
  def get_nowait(self):
    return self.get(false)
if __name__ == "__main__":
  # connect mysqldb
  conn_sql = mysqldb.connect(
            host = '127.0.0.1',
            port = 3306,
            user = 'root',
            passwd = '',
            db = 'test',
            charset = 'utf8'
            )
def job_for_redis():
    get_data = fromsql(conn_sql)
    data = get_data.acquire()
    q = redisqueue('test',host='localhost', port=6379, db=0)
    for single_data in data:
      for meta_data in single_data:
        q.put(meta_data)
        print(meta_data)
    print("all data had been inserted.") 
"""
  try:
    schedule.every().day.at("00:00").do(job_for_redis)
  except exception as e:
    print('error: %s'% e)
#  finally:
#    conn.close()
  while true:
    schedule.run_pending()
    time.sleep(1)
"""

补充知识:python定时获取汇率存入数据库

python定时任务:

我们可以使用 轻量级的第三方模块schedule。首先先安装:pip install schedule

定时任务的的小测试:

import schedule
import time
 
def job():
  print("i'm working...")
 
schedule.every(10).minutes.do(job)       # 每隔10分钟执行一次任务
schedule.every().hour.do(job)          # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(job)    # 每天10:30执行一次任务
schedule.every(5).to(10).days.do(job)      # 每5-10天执行一次任务
schedule.every().monday.do(job)         # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行一次任务
 
while true:
  schedule.run_pending()

获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)

import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine
#获取美元的所有外汇
def job():
  content = '美元'
  url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外汇数据地址
  html = requests.get(url).content.decode('utf-8')
  index = html.index(''   content   '')
  str = html[index:index 300]
  result = re.findall('(.*?)',str)
  print("币种:"   result[0])
  print("现汇买入价:"   result[1])
  print("现钞买入价:"   result[2])
  print("现汇卖出价:"   result[3])
  print("现钞卖出价:"   result[4])
  print("中行结算价:"   result[5])
  print("发布时间:"   result[6]   ' '   result[7])
  
 #本地地址 数据库账号 密码  数据库名
  db = pymysql.connect('localhost','root','root','pinyougoudb')
  cursor = db.cursor()
  
 #sql语句
  sql = "update tb_money set huibuy = %s,chaobuy = %s,huisale = %s,chaosale = %s,centerresult= %s,publishtime = '%s' where typeid = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6]   ' '   result[7], result[0])
  cursor.execute(sql)
  db.commit()
  print('success')
 # 查询语句,将存入的数据查出来
  # sqlalchemy 进行数据库初始化
  engine = create_engine('mysql pymysql://root:root@localhost:3306/pinyougoudb')
  sql = '''select * from tb_money'''
  # pandas 进行数据库读写
  df = pandas.read_sql_query(sql,engine)
  print(df)
  db.commit()
# 每隔几分中刷新一次
#schedule.every(0.1).minutes.do(job)
#每天什么时候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)
#一直循环 知道满足条件执行
while true:
  schedule.run_pending()
免费资源网,https://freexyz.cn/
返回顶部
顶部
网站地图