Skip to content

ota中有tar.gz压缩包升级方式,请问这个压缩包是如何制作的 #2

@Connor-WangK

Description

@Connor-WangK
class FileDecode(object):

    def __init__(self, zip_file, parent_dir="/fota/usr/"):
        self.data = b''
        self.fp = open(zip_file, "rb")
        self.fileData = None
        self.parent_dir = parent_dir
        self.update_file_list = []

    def get_update_files(self):
        return self.update_file_list

    def unzip(self):
        """缓存到内存中"""
        self.fp.seek(10)
        self.fileData = uzlib.DecompIO(self.fp, -15, 1)

    @classmethod
    def _ascii_trip(cls, data):
        return data.decode('ascii').rstrip('\0')

    @classmethod
    def file_size(cls, data):
        """获取真实size数据"""
        size = cls._ascii_trip(data)
        if not len(size):
            return 0
        return int(size, 8)

    @classmethod
    def get_file_name(cls, file_name):
        """获取文件名称"""
        return cls._ascii_trip(file_name)

    def get_data(self):
        return self.fileData.read(0x200)

    def unpack(self):
        try:
            folder_list = set()
            self.data = self.get_data()
            while True:
                if not self.data:
                    break
                size = self.file_size(self.data[124:135])
                file_name = self.get_file_name(self.data[:100])
                full_file_name = self.parent_dir + file_name

                if not size:
                    if len(full_file_name):
                        ql_fs.mkdirs(full_file_name)
                        if full_file_name not in folder_list and full_file_name != self.parent_dir:
                            folder_list.add(full_file_name)
                    else:
                        return
                    self.data = self.get_data()
                else:
                    self.data = self.get_data()
                    update_file = open(full_file_name, "wb+")
                    total_size = size
                    while True:
                        size -= 0x200
                        if size <= 0:
                            update_file.write(self.data[:size + 512])
                            break
                        else:
                            update_file.write(self.data)
                        self.data = self.get_data()
                    self.data = self.get_data()
                    update_file.close()
                    self.update_file_list.append({"file_name": file_name, "size": total_size})
        except Exception as e:
            self.fp.close()
            return False
        else:
            self.fp.close()
            return True


class AppFota(object):

    def __init__(self):
        self.fota = BaseAppFota.new()

    def set_update_flag(self):
        """设置升级标志(当且仅当升级文件下载成功后,且设置了升级标志,重启后才会触发升级,否则不升级。)"""
        self.fota.set_update_flag()

    def download(self, url, file_name):
        """下载单一升级文件

        @url: 待下载文件的url,类型为str。
        @file_name: 本地待升级文件的绝对路径,类型str。
        """
        if self.fota.download(url, file_name) == 0:
            return True
        return False

    def bulk_download(self, info):
        """批量下载升级文件

        @info: 批量下载列表,列表的元素均为包含了url和file_name的字典,类型为list
        """
        return self.fota.bulk_download(info)

    @staticmethod
    def __download_file_from_server(url, path):
        response = request.get(url)
        if response.status_code not in (200, 206):
            return False
        with open(path, 'wb') as f:
            for c in response.content:
                f.write(c)

    @staticmethod
    def __decode_file_to_updater_dir(path, updater_dir):
        fd = FileDecode(path, parent_dir=updater_dir)
        ql_fs.mkdirs(updater_dir)
        fd.unzip()
        if fd.unpack():
            for file in fd.update_file_list:
                update_download_stat('', '/usr/' + file['file_name'], file['size'])
            return True
        else:
            return False

    def download_tar(self, url, path="/usr/temp.tar.gz"):
        """通过压缩文件下载升级。"""
        self.__download_file_from_server(url, path)
        if self.__decode_file_to_updater_dir(
            path,
            self.fota.app_fota_pkg_mount.fota_dir + '/usr/.updater/usr/'
        ):
            uos.remove(path)
            return True
        else:
            return False

这个解压缩函数,对应的压缩包是如何制作的

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions