python threading 具有线程安全的队列生产者-消费者

python threading Queue producer-consumer with thread-safe(python threading 具有线程安全的队列生产者-消费者)
本文介绍了python threading 具有线程安全的队列生产者-消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用线程和队列来获取 url 并存储到数据库.
我只想要一个线程来做存储工作.
所以我编写代码如下:

I am using threading and Queue to fetch url and store to database.
I just want one thread to do storing job.
so I write code as below:

import threading
import time

import Queue

site_count = 10

fetch_thread_count = 2

site_queue = Queue.Queue()
proxy_array=[]        


class FetchThread(threading.Thread):
    def __init__(self,site_queue,proxy_array):
        threading.Thread.__init__(self)
        self.site_queue = site_queue
        self.proxy_array = proxy_array
    def run(self):
        while True:
            index = self.site_queue.get()
            self.get_proxy_one_website(index)
            self.site_queue.task_done()
    def get_proxy_one_website(self,index):
        print '{0} fetched site :{1}
'.format(self.name,index)
        self.proxy_array.append(index)


def save():
    while True:
        if site_queue.qsize() > 0:
            if len(proxy_array) > 10:
                print 'save :{0}  to database
'.format(proxy_array.pop())

            else:
                time.sleep(1)
        elif len(proxy_array) > 0:
            print 'save :{0} to database
'.format(proxy_array.pop())

        elif len(proxy_array) == 0:
            print 'break'
            break
        else:
            print 'continue'
            continue

def start_crawl():
    global site_count,fetch_thread_count,site_queue,proxy_array
    print 'init'
    for i in range(fetch_thread_count):
        ft = FetchThread(site_queue,proxy_array)
        ft.setDaemon(True)
        ft.start()

    print 'put site_queue'
    for i in range(site_count):
        site_queue.put(i)

    save()

    print 'start site_queue join'
    site_queue.join()
    print 'finish'

start_crawl()

执行输出:

init
put site_queue
Thread-1 fetched site :0

Thread-2 fetched site :1

Thread-1 fetched site :2

Thread-2 fetched site :3

Thread-1 fetched site :4

Thread-2 fetched site :5

Thread-1 fetched site :6

Thread-2 fetched site :7

Thread-1 fetched site :8

Thread-2 fetched site :9

save :9 to database

save :8 to database

save :7 to database

save :6 to database

save :5 to database

save :4 to database

save :3 to database

save :2 to database

save :1 to database

save :0 to database

break
start site_queue join
finish
[Finished in 1.2s]

为什么 save() 函数在 site_queue.join() 之后运行,它写在 save() 之后.
我也用线程函数替换了 save(),但它也不起作用.
这是否意味着我必须将 proxy_array=[] 更改为 proxy_queue=Queue.Queue() ,然后才能使用标题存储数据?
我只想要一个theads来做这个,没有其他theads会从proxy_array获取数据,我为什么要加入呢?使用Queue似乎很奇怪.
有没有更好的解决方案?

Why save() function run after site_queue.join() which written after save().
I also have substituted save() with a thread function ,but it doesn't work too.
Does it mean I must change proxy_array=[] to proxy_queue=Queue.Queue() ,then I can use theading to store data?
I just want one thead to do this,and there is not any other theads would get data from proxy_array , why should I join it?Using Queue seems very weird.
Is there any better solusion?

更新:
我不想等到所有 FetchThreads 完成他们的工作.我想在 fethcing 时保存数据,它会快得多.我希望结果如下所示(因为我使用 array.pop(),所以保存 0 可能会在稍后出现,这只是一个易于理解的示例.):

UPDATE:
I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster. I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):

Thread-2 fetched site :1

Thread-1 fetched site :2

save :0 to database

Thread-2 fetched site :3

Thread-1 fetched site :4

save :2 to database

save :3 to database


Thread-2 fetched site :5

Thread-1 fetched site :6

save :4 to database
.......

某人的 UPDATE2 有以下相同的问题:

问题:
正如我在上面所说的那样,没有任何其他线程会从 proxy_array 获取数据.
我只是无法想象为什么它会破坏线程安全?

答:
生产者-消费者问题在misha的回答中,我仔细阅读后明白了.

question:
As I saying as above context,there is not any other theads would get data from proxy_array.
I just can not imagine why it would break thread-safe?

answer:
producer-consumer problem in misha's answer, I understand after reading it carefully.


问题:
还有一个问题,程序主线程是否可以使用 FetchThreads 充当消费者(换句话说,不需要创建 StoreThread)

这是我想不通的,找到答案后我会更新.


question:
And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)

this is what I cannot figure out,I would update after finded the answer.

推荐答案

我建议你阅读 生产者-消费者问题.您的生产者是获取线程.您的消费者是 save 功能.如果我理解正确,您希望消费者尽快保存获取的结果.为此,生产者和消费者必须能够以某种线程安全的方式(例如队列)进行通信.

I recommend you read about the producer-consumer problem. Your producers are the fetch threads. Your consumer is the save function. If I understand correctly, you want the consumer to save the fetched result as soon as its available. For this to work, the producer and consumer must be able to communicate in some thread-safe way (e.g. a queue).

基本上,您需要另一个队列.它将替换 proxy_array.您的 save 函数将如下所示:

Basically, you need another queue. It would replace proxy_array. Your save function will look something like this:

while True:
 try:
   data = fetch_data_from_output_queue()
   save_to_database(data)
 except EmptyQueue:
   if not stop_flag.is_set():
     # All done
     break
   time.sleep(1)
   continue

这个 save 函数需要在它自己的线程中运行.stop_flag 是一个 事件设置 你加入你的 fetch 线程.

This save function will need to run in its own thread. stop_flag is an Event that gets set after you join your fetch threads.

从高层次上看,您的应用程序将如下所示:

From a high level, your application will look like this:

input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()

stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete

这篇关于python threading 具有线程安全的队列生产者-消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)