[Netdisk project log] 20210601: Seafile offline download system development (2)

Hey, happy children's day~

Article Directory

Book last time

We are not going to develop an offline download system. According to the idea at the time, we want to add the new system to seafevents. The general logic is this:

  • The user sends a request to add a link to the django side;
  • Django receives the link and transfers the link to the seafevents process through event;
  • seafevents receives the event, adds the Task to the database, and the status is waiting;
  • There is a periodic task in seafevents, which checks whether there is a new task every 5 seconds;
  • If a new task is checked, take out the task information and set the status to downloading;
  • Call the Aria2 interface to download the file to a temporary location;
  • Call SeafileAPI to add temporary files to SeafileFS;
  • Set status to done.

In addition, several special cases are considered:

  • If the download of Aria2 fails: Set the status as failed, and write the reason in the comment field.
  • If the download is halfway and the service is aborted: the next time the service is started, all downloading tasks will be checked first, and Aria2 will be called to restart the download.

Study the event mechanism of seafevents

As mentioned in the previous blog, seafevents can not only perform timing tasks, but also accept events from seahub and seafile, which are used in the audit system:

handlers.add_handler('seahub.stats:user-login', UserLoginEventHandler)
handlers.add_handler('seaf_server.stats:web-file-upload', FileStatsEventHandler)
handlers.add_handler('seaf_server.stats:web-file-download', FileStatsEventHandler)
handlers.add_handler('seaf_server.stats:link-file-upload', FileStatsEventHandler)
handlers.add_handler('seaf_server.stats:link-file-download', FileStatsEventHandler)
handlers.add_handler('seaf_server.stats:sync-file-upload', FileStatsEventHandler)
handlers.add_handler('seaf_server.stats:sync-file-download', FileStatsEventHandler)

As you can see, here is to bind the 6 operations of the file with a handler. So, where are these events sent out?

I tried searching in seahub web-file-upload, but I didn't find any useful information. Obviously, the event six kinds of file operations are not emitted from seahub in its prefix seaf_serveralso confirms my idea. I tried to find user-login, this time I found something useful:

try:
    seafile_api.publish_event('seahub.stats', 'user-login\t%s\t%s\t%s' % (email, timestamp, org_id))
except Exception as e:
    logger.error('Error when sending user-login message: %s' % str(e))

The original is a publish_eventmethod, but it is actually the direct use of parameters \tto separate, have to say is really very simple and crude ah.

However, we found that when seahub sends events to seafevents, seafileAPI is actually called. That is to say, in essence, this event is first sent from seahub to seafile-server, and then from seafile-server to seafevents? Let's verify it.

Open the seafile-server code, search for publish_event, and find this statement:

json_t *msg = json_object();
json_object_set_new (msg, "content", json_string(content));
json_object_set_new (msg, "ctime", json_integer(time(NULL)));
g_async_queue_push (async_queue, msg);

It turned out to be one g_async_queue. Then in seafevents:

while 1:
    try:
        msg = seafile_api.pop_event(channel)
    except Exception as e:
        logger.error('Failed to get event: %s' % e)
        time.sleep(3)
        continue
    if msg:
        try:
            message_handler.handle_message(session, channel, msg)
        except Exception as e:
            logger.error(e)
        finally:
            session.close()
    else:
        time.sleep(0.5)

Called every 0.5 seconds seafile_api.pop_event, it is also simple and rude!

So here, we learned some new knowledge:

  • The use of seafevents must rely on a valid seafile-server, and seafevents connects to seafile-server through searpc;
  • The seafevents loaded in seahub just reads the database information, and then connects to the database in the seafevents configuration file, so that the database query statement in seafevents can be used, but the app (main) of seafevents is not started by itself;
  • Events sent by seahub are sent to seafevents via seafile-server.

Write handler

To imitate the handler in the statistics module, I also made a handler:

def OfflineDownloadEventHandler(session, msg):
    elements = msg['content'].split('\t')
    if len(elements) != 3:
        logging.warning("got bad message: %s", elements)
        return
    repo_id = elements[0]
    path = elements[1]
    user_name = elements[2]

    # add_offline_download_record(appconfig.session_cls(), repo_id, path, user_name)
    add_offline_download_record(session, repo_id, path, user_name)


def register_handlers(handlers):
    handlers.add_handler('seahub.stats:offline-file-upload', OfflineDownloadEventHandler)

This event is currently not defined in seahub, we will keep it for now.

Configure the file system

I want to make the configuration file into a format like this and write it in seafevents.conf:

# Enable Aria2 offline download
[OFFLINE DOWNLOAD]
enabled = true
tempdir = /tmp/offline-download
workers = 1
max-size = 800mb

I imitated the file search system and first defined a HAS_OFFLINE_DOWNLOAD global variable:

# offline download related
HAS_OFFLINE_DOWNLOAD = False
if EVENTS_CONFIG_FILE:
    def check_offline_download_enabled():
        enabled = False
        if hasattr(seafevents, 'is_offline_download_enabled'):
            enabled = seafevents.is_offline_download_enabled(parsed_events_conf)

            if enabled:
                logging.debug('offline download: enabled')
            else:
                logging.debug('offline download: not enabled')
        return enabled

    HAS_FILE_SEARCH = check_offline_download_enabled()

Then in the corresponding place, I wrote the detection function:

def get_offline_download_conf(config):
    '''Parse search related options from seafevents.conf'''
    if not has_offline_download_tools():
        logging.debug('offline downloader is not enabled because Aria2 is not found')
        return dict(enabled=False)

    section_name = 'OFFLINE DOWNLOAD'
    key_enabled = 'enabled'
	......

There are still some details that I won’t put, anyway, I have used all the things I learned before.

I have to say that the previous experience of borrowing (stealing) three functions from the pro has really taught me a lot of things, and now I’m just familiar with it (not

Database part

Imitation virus scanning system, I created OfflineDownloadRecordthe model, and wrote a number of database CRUD statements:

class OfflineDownloadRecord(Base):
    __tablename__ = 'OfflineDownloadRecord'

    odr_id = Column(Integer, primary_key=True, autoincrement=True)
    repo_id = Column(String(length=36), nullable=False, index=True)
    path = Column(Text, nullable=False)
    owner = Column(DateTime(), nullable=False)
    timestamp = Column(DateTime(), nullable=False)
    size = Column(BigInteger, nullable=False, default=0)
    status = Column(SmallInteger, nullable=False, comment='0=Unknown, 1=Waiting, 2=Downloading, 3=OK, 4=Error')
    comment = Column(Text, nullable=False)
    __table_args__ = {'extend_existing':True}

There are too many additions, deletions, modifications, and so on, so I won't put them here. But here is one thing, SQLAlchme used in seafevents is really powerful, and I really feel comfortable when I use it.

Define a scheduled task

As mentioned earlier, we need to make an interface to get all new tasks every 5 seconds.

Imitating Virus Scan, let's also do a timed task.

Implement Worker

Want to schedule tasks. . . The task is the first thing, right? . .

First imitating Virus Scanner, we make a data transmission class specifically for Worker:

class OfflineDownloadTask(object):
    def __init__(self, odr_id, repo_id, path, url, owner):
        self.odr_id = odr_id
        self.repo_id = repo_id
        self.path = path    # The path is actually the dir
        self.url = url
        self.owner = owner

Then define our thread execution class and method:

class OfflineDownload(object):
	def __init__(self, settings):
        self.settings = settings
        self.db_oper = DBOper(settings)
        self.thread_pool = ThreadPool(self.download_file, self.settings.max_workers)
        self.thread_pool.start()
	
	def restore(self):
        # Check and restore all the interrupted tasks.
        task_list = self.db_oper.get_offline_download_tasks_by_status(OfflineDownloadStatus.DOWNLOADING)

        for row in task_list:
            self.thread_pool.put_task(OfflineDownloadTask(row.odr_id, row.repo_id, row.path, row.url, row.owner))

    def start(self):
        # Check and restore all the interrupted tasks.
        task_list = self.db_oper.get_offline_download_tasks_by_status(OfflineDownloadStatus.WAITING)

        for row in task_list:
            self.db_oper.set_record_status(row.odr_id, OfflineDownloadStatus.DOWNLOADING)
            self.thread_pool.put_task(OfflineDownloadTask(row.odr_id, row.repo_id, row.path, row.url, row.owner))

	......

First, two functions are defined, one restore is used to solve the situation that seafevents exits unexpectedly as mentioned above. The other is a formal scan task. We plan to execute the start() function every 5 seconds.

There is a more interesting design in the constructor, which is not the same as Virus Scan. Here I directly use the ThreadPool class provided in the Virus Scan system, but unlike Virus Scan, I create the thread pool directly in the constructor of the Worker, and then directly add a new thread pool to the thread pool at start task. While Virus Scan creates a new thread pool every time it is executed, because Virus Scan itself sets a scan time of 1 hour, so this overhead is not a big deal. It is a waste of resources to keep the thread pool open. But my task runs very frequently, so the thread pool has been opened for higher returns.

The main execution function will not be released here, the specific steps:

  • Obtain task information from the queue;
  • Create a temporary folder;
  • Call the Aria2 interface to download the file to the temporary folder;
  • Call seafile_API to store the files in the temporary folder in seafileFS;
  • Delete temporary files and temporary folders.

Why not create a temporary file directly here? Because I call an external tool to download, I can’t get the downloaded file name in a more convenient way. So, by the temporary folder, we retain its original file name when downloading via the download is complete os.listdiracquisition downloaded the file name of the function.

There was a small problem when writing here. It doesn't work to remove and unlink folders directly. Then you can only delete all the contents first, and then delete the folder itself:

if tdir is not None and len(tdir) > 0:
    file_list = os.listdir(tdir)
    for item in file_list:
        os.unlink(os.path.join(tdir, item))
    os.rmdir(tdir)

Register Worker and start class

We created seafevents of tasks folder named offline_downloader.pyfile, and imitate Virus Scan functions, the establishment of a Worker class and start a timer class in it.

class OfflineDownloader(object):
    def __init__(self, config_file):
        self.settings = Settings(config_file)
        self.downloader = OfflineDownload(self.settings)

    def is_enabled(self):
        return self.settings.is_enabled()

    def start(self):
        logging.info("Start offline downloader, refresh interval = 5 sec")
        logging.info("Restoring interrupted download tasks...")
        self.downloader.restore()
        OfflineDownloadTimer(self.downloader, self.settings).start()


class OfflineDownloadTimer(Thread):
    def __init__(self, downloader, settings):
        Thread.__init__(self)
        self.settings = settings
        self.finished = Event()
        self.downloader = downloader

    def run(self):
        while not self.finished.is_set():
            self.finished.wait(5)
            if not self.finished.is_set():
                self.downloader.start()

    def cancel(self):
        self.finished.set()

This is also different from Virus Scan. I first call the restore() method in the start method of the thread to put the last unfinished task into the queue, and then start the Timer. Timer runs Worker's start method periodically to periodically scan for new tasks in the database.

Then app.py, the imitation of other tasks at the start is registered.

In fact, there are still a lot of details. In fact, this part is a summary text that I wrote after I have completed the first edition, so a lot of details are missing, and only the parts that I think are more critical are written.

Write Seahub's routing and View

Here I am in seahub/api2/endpoints/offline_download.pywritten inside a View Class, with two ways to get and put the request to separate the tasks of the new Task List and interfaces, respectively. It is defined as follows in urls.py:

## offline download
url(r'^api/v2.2/offline-download/tasks$', OfflineDownloadTasks.as_view(), name='api-v2.2-offline-download-tasks'),
url(r'^api/v2.2/offline-download/add$', OfflineDownloadTasks.as_view(), name='api-v2.2-offline-download-add'),

v2.2 version API (hey

Here I changed my original idea. In addTask, I directly used the database update command. Originally, what I wanted was to send an event from seahub to the backend, and then seafevents would handle the database entirely. But after I finished the prototype, I found that the first is a scan cycle of 5 seconds. After the user clicks the add button, the database has not been updated for a long time (within several seconds), so there is no way to display anything on the user interface. In addition, if the seafevents service is not started, the user's offline download task cannot even be pushed up, and the user experience is very poor.

And when it was designed, seafevents actually created a new record in the database immediately after receiving the event signal. Obviously, this was a fart after taking off his pants. Therefore, I finally chose to remove the event signal.

front end

Ah, the front end, it really made me a little confused.

Correct the contents of the previous day

Remember that in the last issue we did a good job of multi-layered one, onOfflineDownloadfunction? Yes, I finally got rid of it today. Laughing to death, just busy all night. I was completely crippled by the file upload button next to it. When people upload files, they will upload data after pressing the button, so it needs a higher level. And my offline download is just to open a dialog box. The simple dialog box control function is passed to so many layers beyond.

So it became directly like this:

class DirOperationToolbar extends React.Component {

  constructor(props) {
    super(props);
    this.state = {
      fileType: '.md',
      isCreateFileDialogShow: false,
      isCreateFolderDialogShow: false,
      isOfflineDownloadDialogShow: false,
      isUploadMenuShow: false,
      isCreateMenuShow: false,
      isShareDialogShow: false,
      operationMenuStyle: '',
      isMobileOpMenuOpen: false
    };
  }

Then made a new direct OfflineDownloadDialogthrow in DirOperationToolbarthe render () method in. . . Let DirOperationToolbargo directly to the processing of the dialog box to open and close the issue.

Idea, then, is to say about it, which is stored in a state isOfflineDownloadDialogShowvariable, and then render function which determines whether this variable is true, if it is true, put on a Modal Dialog in the Component, render out together. Then add the close button to Dialog a response to an event, click isOfflineDownloadDialogShowbecomes false, it will be able to switch freely friends.

Make a simple interface

In short, I have made a very, very simple interface, without localization, only displaying the original URL and the status of the task:

Insert picture description here

Seafile_API issues

As mentioned earlier, because seafile_API is made into a lib and released to the npm network, it is not convenient for us to modify the JS code of seafile_API directly. However, we can directly imitate the implementation in seafile_API:

// We cannot modify the seafile-API lib, but we can add a raw method.
function getOfflineDownloadTask() {
  let url = seafileAPI.server + '/api/v2.2/offline-download/tasks';
  return seafileAPI.req.get(url, { });
}
function addOfflineDownloadTask(repoId, path, targetUrl) {
  let url = seafileAPI.server + '/api/v2.2/offline-download/add';
  return seafileAPI.req.put(url, { repo_id: repoId, path: path, url: targetUrl });
}

Hey, this is also more convenient, that is, if you add more later, it will be difficult to manage.

Refresh list data regularly

Then I encountered a problem. The status of the task changes very quickly. You can't let the user refresh the page in a React App to see if the download task is complete, right?

Therefore, I designed a timing trigger to automatically retrieve the task list every 3 seconds and reload it on the user interface.

In order to ensure the performance, prevent unnecessary requests (after all, if everyone has acquired a 3 seconds, then the server will be too much), I made a small optimization: at componentDidMountthe time the timer is started, and componentWillUnmountwhen the suspension timer.

componentDidMount() {
  this.refreshList();
  this.setState({ refreshTimer: setInterval(this.refreshList.bind(this), 3000) });
}

componentWillUnmount() {
  clearInterval(this.state.refreshTimer);
  this.setState({ refreshTimer: null });
}

Ah, here is another problem that occurred at the time. SetInterval is in default, callback function is thisa pointer to refer to the object is no longer OfflineDownloadDialogclass, but rather other things, in short, in that case there is no way to call the setStatemethod has. By adding .bind(this)ensures callback function thisis the OfflineDownloadDialogobject class.

Refresh the file list after uploading

There is also a more detailed problem, that is, after the offline download is completed, the user cannot immediately see the file in the file list, so the user has to refresh the page again, and the experience is very poor.

After step by step look up, I found the frontend/src/pages/lib-content-view/lib-content-view.jsComponent defined in another loadDirentListfunction very much, but it is used to load the contents of the folder. (Hey, it’s mainly from the previous experience of working on the project that dirent is the thing in the folder)

Well, we can put loadDirentListas a prop cascaded to our OfflineDownloadDialoglast, so when we upload is complete, simply call this loadDirentList, refresh the contents of a folder, just fine.

I won't repeat the writing of the step-by-step pass, I have written enough before.

But there is another curious question here, how can we judge that the upload is complete? We only kept an interface to get taskList, and did not track one or several tasks in the code implementation, it seems difficult to judge the timing.

The solution I think of is: store a number of tasks with OK status in state, and then every time a new list is obtained, count again how many tasks in the newly obtained list are with OK status. If it is found to be different from the last record, the list is updated once. In this way, the update overhead can be reduced a lot.

getOfflineDownloadTask().then((res) => {
  this.setState({
    taskList: res.data.data,
    errMessage: ''
  });
  let ok_cnt = 0;
  for (let i = 0; i < this.state.taskList.length; i++) {
    let is_ok = this.state.taskList[i].status === 3;
    if (is_ok) ok_cnt++;
  }
  if (ok_cnt !== this.state.ent_ok_num) this.props.refreshDirent();
  this.setState({ ent_ok_num: ok_cnt });
})

Okay, now that the functions are basically great, quickly save the code, and then work on the aesthetics and localization of the interface.