Why is this needed?
Once I was faced with the task of copying a large number of files from an ftp server. It was necessary to make a backup. It would seem that it could be easier! But alas, nothing ready to work as quickly for my conditions could not be found.
Situation
It was necessary to periodically fetch a couple of hundred files from the ftp server under Windows. A lot of little things and a few very large files. A total of about 500 GB. The server is a vps located quite far abroad. During the day, the car is highly loaded, early at night, routine maintenance is performed, in total, there are 5 hours to download at most.
None of the utilities I have reviewed were able to cope efficiently and in the allotted time. Well, there is nowhere to go, a normal backup system has not yet been bought, which means we arm ourselves with an editor or Python IDE and go! Adventure!
Config
We will put all the parameters for the script in a separate file for convenience.
Config template:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # ,
max_threads = 20 #
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
We save the config with the .py extension and import it at the beginning of our script. You can import directly into the script's namespace, but I made the construction a bit like a crutch in the main part of my script:
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # ,
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
In the beginning there was a list
ftp , , , ftp- . , , - .
. , , , -.
- ftp:
class MyFtp (ftplib.FTP):
""" , """
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
. ftplib, .
:
class FileList:
""" """
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
""" ftp-."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #
entry_file_list['filename'] = dirname[0] #
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
, , , , , .
logging. , .
class MyLogger:
""" """
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
""" """
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
""" """
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
, .
. , :
class BaseFileDownload(threading.Thread):
""" """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 #
def __del__(self):
self.__class__.count -= 1
def connect(self):
""" ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
""" """
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
count. : , , , .
run - threading ( !), .
, os.makedirs.
-
. zabbix, , - , .
The class for working with this files looks like this:
class StatusFile:
""" ."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
Multithreading
And finally, the main script function itself, which works with download streams:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # ,
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: #
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Here we start logging, get a list of files (it is stored in memory).
In an eternal while loop, we check the number of simultaneously running downloads and, if necessary, start additional threads.
The entire source code can be found here .