python实现的文件同步服务器实例

发表于 5年以前  | 总阅读数:701 次

本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:


    # receive file from client and store them into file use asyncore.# 
    #/usr/bin/python 
    #coding: utf-8 
    import asyncore 
    import socket 
    from socket import errno 
    import logging 
    import time 
    import sys 
    import struct 
    import os 
    import fcntl 
    import threading 
    from rrd_graph import MakeGraph 
    try: 
      import rrdtool 
    except (ImportError, ImportWarnning): 
      print "Hope this information can help you:" 
      print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." 
      sys.exit(1) 
    class RequestHandler(asyncore.dispatcher): 
      def __init__(self, sock, map=None, chunk_size=1024): 
        self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) 
        self.chunk_size = chunk_size 
        asyncore.dispatcher.__init__(self,sock,map) 
        self.data_to_write = list() 
      def readable(self): 
        #self.logger.debug("readable() called.") 
        return True 
      def writable(self): 
        response = (not self.connected) or len(self.data_to_write) 
        #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) 
        return response 
      def handle_write(self): 
        data = self.data_to_write.pop() 
        #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data)) 
        sent = self.send(data[:self.chunk_size]) 
        if sent < len(data): 
          remaining = data[sent:] 
          self.data_to_write.append(remaining) 
      def handle_read(self): 
        self.writen_size = 0 
        nagios_perfdata = '../perfdata' 
        head_packet_format = "!LL128s128sL" 
        head_packet_size = struct.calcsize(head_packet_format) 
        data = self.recv(head_packet_size) 
        if not data: 
          return 
        filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) 
        filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) 
        filename = filename[:filename_len] 
        self.logger.debug("update file: %s" % filepath + '/' + filename)
        try: 
          if not os.path.exists(filepath): 
            os.makedirs(filepath) 
        except OSError: 
          pass 
        self.fd = open(os.path.join(filepath,filename), 'w') 
        #self.fd = open(filename,'w') 
        if filesize > self.chunk_size: 
          times = filesize / self.chunk_size 
          first_part_size = times * self.chunk_size 
          second_part_size = filesize % self.chunk_size 
          while 1: 
            try: 
              data = self.recv(self.chunk_size) 
              #self.logger.debug("handle_read()->%s size.",len(data)) 
            except socket.error,e: 
              if e.args[0] == errno.EWOULDBLOCK: 
                print "EWOULDBLOCK" 
                time.sleep(1) 
              else: 
                #self.logger.debug("Error happend while receive data: %s" % e) 
                break 
            else: 
              self.fd.write(data) 
              self.fd.flush() 
              self.writen_size += len(data) 
              if self.writen_size == first_part_size: 
                break 
          #receive the packet at last 
          while 1: 
            try: 
              data = self.recv(second_part_size) 
              #self.logger.debug("handle_read()->%s size.",len(data)) 
            except socket.error,e: 
              if e.args[0] == errno.EWOULDBLOCK: 
                print "EWOULDBLOCK" 
                time.sleep(1) 
              else: 
                #self.logger.debug("Error happend while receive data: %s" % e) 
                break 
            else: 
              self.fd.write(data) 
              self.fd.flush() 
              self.writen_size += len(data) 
              if len(data) == second_part_size: 
                break 
        elif filesize <= self.chunk_size: 
          while 1: 
            try: 
              data = self.recv(filesize) 
              #self.logger.debug("handle_read()->%s size.",len(data)) 
            except socket.error,e: 
              if e.args[0] == errno.EWOULDBLOCK: 
                print "EWOULDBLOCK" 
                time.sleep(1) 
              else: 
                #self.logger.debug("Error happend while receive data: %s" % e) 
                break 
            else: 
              self.fd.write(data) 
              self.fd.flush() 
              self.writen_size += len(data) 
              if len(data) == filesize: 
                break 
        self.logger.debug("File size: %s" % self.writen_size) 
    class SyncServer(asyncore.dispatcher): 
      def __init__(self,host,port): 
        asyncore.dispatcher.__init__(self) 
        self.debug = True 
        self.logger = logging.getLogger(self.__class__.__name__) 
        self.create_socket(socket.AF_INET,socket.SOCK_STREAM) 
        self.set_reuse_addr() 
        self.bind((host,port)) 
        self.listen(2000) 
      def handle_accept(self): 
        client_socket = self.accept() 
        if client_socket is None: 
          pass 
        else: 
          sock, addr = client_socket 
          #self.logger.debug("Incoming connection from %s" % repr(addr)) 
          handler = RequestHandler(sock=sock) 
    class RunServer(threading.Thread): 
      def __init__(self): 
        super(RunServer,self).__init__() 
        self.daemon = False 
      def run(self): 
        server = SyncServer('',9999) 
        asyncore.loop(use_poll=True) 
    def StartServer(): 
      logging.basicConfig(level=logging.DEBUG, 
                format='%(name)s: %(message)s', 
                ) 
      RunServer().start() 
      #MakeGraph().start() 
    if __name__ == '__main__': 
      StartServer()

客户端:


    # monitor path with inotify(python module), and send them to remote server.# 
    # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# 
    import socket 
    import time 
    import os 
    import sys 
    import struct 
    import threading 
    import Queue 
    try: 
       import pyinotify 
    except (ImportError, ImportWarnning): 
       print "Hope this information can help you:" 
       print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." 
       sys.exit(1) 
    try: 
       from sendfile import sendfile 
    except (ImportError,ImportWarnning): 
       pass 
    filetype_filter = [".rrd",".xml"] 
    def check_filetype(pathname): 
       for suffix_name in filetype_filter: 
         if pathname[-4:] == suffix_name: 
           return True 
       try: 
         end_string = pathname.rsplit('.')[-1:][0] 
         end_int = int(end_string) 
       except: 
         pass 
       else: 
         # means pathname endwith digit 
         return False 
    class sync_file(threading.Thread): 
       def __init__(self, addr, events_queue): 
         super(sync_file,self).__init__() 
         self.daemon = False 
         self.queue = events_queue 
         self.addr = addr 
         self.chunk_size = 1024 
       def run(self): 
         while 1: 
           event = self.queue.get() 
           if check_filetype(event.pathname): 
             print time.asctime(),event.maskname, event.pathname 
             filepath = event.path.split('/')[-1:][0] 
             filename = event.name 
             filesize = os.stat(os.path.join(event.path, filename)).st_size 
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
             filepath_len = len(filepath) 
             filename_len = len(filename) 
             sock.connect(self.addr) 
             offset = 0 
             data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize) 
             fd = open(event.pathname,'rb') 
             sock.sendall(data) 
             if "sendfile" in sys.modules: 
               # print "use sendfile(2)" 
               while 1: 
                 sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) 
                 if sent == 0: 
                   break 
                 offset += sent 
             else: 
               # print "use original send function" 
               while 1: 
                 data = fd.read(self.chunk_size) 
                 if not data: break 
                 sock.send(data) 
             sock.close() 
             fd.close() 
    class EventHandler(pyinotify.ProcessEvent): 
       def __init__(self, events_queue): 
         super(EventHandler,self).__init__() 
         self.events_queue = events_queue 
       def my_init(self): 
         pass 
       def process_IN_CLOSE_WRITE(self,event): 
         self.events_queue.put(event) 
       def process_IN_MOVED_TO(self,event): 
         self.events_queue.put(event) 
    def start_notify(path, mask, sync_server): 
       events_queue = Queue.Queue() 
       sync_thread_pool = list() 
       for i in range(500): 
         sync_thread_pool.append(sync_file(sync_server, events_queue)) 
       for i in sync_thread_pool: 
         i.start() 
       wm = pyinotify.WatchManager() 
       notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) 
       wdd = wm.add_watch(path,mask,rec=True) 
       notifier.loop() 
    def do_notify(): 
       perfdata_path = '/var/lib/pnp4nagios/perfdata' 
       mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO 
       sync_server = ('127.0.0.1',9999) 
       start_notify(perfdata_path,mask,sync_server) 
    if __name__ == '__main__': 
       do_notify()

python监视线程池


    #!/usr/bin/python 
    import threading 
    import time 
    class Monitor(threading.Thread): 
      def __init__(self, *args,**kwargs): 
        super(Monitor,self).__init__() 
        self.daemon = False 
        self.args = args 
        self.kwargs = kwargs 
        self.pool_list = [] 
      def run(self): 
        print self.args 
        print self.kwargs 
        for name,value in self.kwargs.items(): 
          obj = value[0] 
          temp = {} 
          temp[name] = obj 
          self.pool_list.append(temp) 
        while 1: 
          print self.pool_list 
          for name,value in self.kwargs.items(): 
            obj = value[0] 
            parameters = value[1:] 
            died_threads = self.cal_died_thread(self.pool_list,name)
            print "died_threads", died_threads 
            if died_threads >0: 
              for i in range(died_threads): 
                print "start %s thread..." % name 
                t = obj[0].__class__(*parameters) 
                t.start() 
                self.add_to_pool_list(t,name) 
            else: 
              break 
          time.sleep(0.5) 
      def cal_died_thread(self,pool_list,name): 
        i = 0 
        for item in self.pool_list: 
          for k,v in item.items(): 
            if name == k: 
              lists = v 
        for t in lists: 
          if not t.isAlive(): 
            self.remove_from_pool_list(t) 
            i +=1 
        return i 
      def add_to_pool_list(self,obj,name): 
        for item in self.pool_list: 
          for k,v in item.items(): 
            if name == k: 
              v.append(obj) 
      def remove_from_pool_list(self, obj): 
        for item in self.pool_list: 
          for k,v in item.items(): 
            try: 
              v.remove(obj) 
            except: 
              pass 
            else: 
              return

使用方法:


    rrds_queue = Queue.Queue() 
      make_rrds_pool = [] 
      for i in range(5): 
        make_rrds_pool.append(MakeRrds(rrds_queue)) 
      for i in make_rrds_pool: 
        i.start() 
      make_graph_pool = [] 
      for i in range(5): 
        make_graph_pool.append(MakeGraph(rrds_queue)) 
      for i in make_graph_pool: 
        i.start() 
      monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \ 
               make_graph_pool=(make_graph_pool, rrds_queue)) 
      monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。

从外部调用Django模块


    import os 
    import sys 
    sys.path.insert(0,'/data/cloud_manage') 
    from django.core.management import setup_environ 
    import settings 
    setup_environ(settings) 
    from common.monitor import Monitor 
    from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。

希望本文所述对大家的Python程序设计有所帮助。

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237299次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8145次阅读
 目录