Source code for digit.data

import inspect
import json
import os.path

from .info import Information
from .setting import CACHE_PATH, INTERFACE_PATH,DATA_STORAGE_TYPE, DATA_DATA_TYPE, DATA_LANGUAGE, DATA_TASK_TYPE

import os
import sys
from .query import QueryDigit
import requests



import importlib.util


[docs]def find_py(path): # 获取指定路径下的所有文件 files = os.listdir(path) # 检查是否存在 __init__.py 文件,如果不存在则创建 init_file = "__init__.py" if init_file not in files: with open(os.path.join(path, init_file), "w") as f: pass # 查找名为 "code.py" 的文件 if "code.py" in files: return "code" # 判断是否只有一个py文件 py_files = [file for file in files if file.endswith(".py")] if len(py_files) == 1: return py_files[0].split(".")[0] return False
[docs]def check_config(config_path, qd): fp, fn = os.path.split(config_path) errors = [] if fn!="config.json": errors.append("config文件名错误,必须为config.json") return errors cats = ['storage_type', 'data_type', 'language', 'task_type', ] cats_name = [DATA_STORAGE_TYPE, DATA_DATA_TYPE, DATA_LANGUAGE, DATA_TASK_TYPE] category = {c: cats_name[index] for index, c in enumerate(cats)} fields = ['name','url_or_path','storage_type','data_type','language','task_type','description','tags'] fields_type = { 'name':str, 'url_or_path':str, 'storage_type':int, 'data_type':int, 'language':int, 'task_type':int, 'description':str, 'tags':list } with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) for field in fields: if field not in config: errors.append(f"{field}缺失") continue if type(config[field]) != fields_type[field]: errors.append(f"{field}字段值格式错误,应为{fields_type[field]}, 实际为{type(config[field])}") continue if field in fields[2:-2] : if config[field] not in category.get(field): errors.append(f"字段{field}的值为{config[field]},不在规定类型的数字代号范围中:{list(category.get(field).keys())}") return errors
[docs]class Data(): def __init__(self): self.info = Information() self.qd = QueryDigit() self.data_id = "" self.DD = "" # 存放加载的数据类 def _get_data_id(self,data_id_or_name): # 检测是否已经缓存, qs = self.qd.get_resources(api_type="dataid", id=data_id_or_name) if not qs: print(f"data_id_or_name = {data_id_or_name}的资源不存在") return False data_id = qs[0].get('data_id') return data_id
[docs] def load(self, data_id_or_name:str, imp_class="DigitData"): data_id = data_id_or_name path = os.path.join(CACHE_PATH, data_id_or_name) if not path: data_id = self._get_data_id(data_id_or_name=data_id_or_name) if not data_id: return False path = os.path.join(CACHE_PATH, data_id) if not os.path.exists(path): print(f"路径不存在:{path}") return False self.data_id = data_id code = find_py(path=path) if not code: print(f"未在该路径下找到代码文件") return False # 找到路径下的代码文件后 # 动态加载 # 添加路径,加载code.py文件, imp_class = imp_class sys.path.append(path) module_path = os.path.join(path,code)+".py" module_name = code spec = importlib.util.spec_from_file_location(module_name, module_path) imp_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(imp_module) # ip_module = __import__('code') clss = inspect.getmembers(imp_module, inspect.isclass) print(clss) # 判断DigitData类是否存在 clss_dict = {cls_name:cls_class for cls_name, cls_class in clss} if not imp_class in clss_dict: print("code.py文件中DigitData类不存在,请检查") return False if not inspect.getmro(clss_dict[imp_class])[1].__name__ in ['TableData', 'LabelData', 'DocData', 'ImageData', 'AudioData', 'VideoData', 'GraphData']: print("DigitData类不是规定类 TableData, LabelData, DocData, ImageData, AudioData, VideoData, GraphData 的子类") return False os.chdir(path) # 加载DIgitData类 DD = getattr(imp_module, imp_class) # 获取类 self.DD = DD(data_id=data_id) return self.DD # 实例化类并返回
[docs] def run(self): self.DD.run()
# 自定义计算
[docs] def upload(self,md_instruction_path,config_path): """ :param config_file: :param md_instruction_file: :param file_upload: :param kwargs: :return: data_id 两种上传方式, 第一种文文件上传,第二种为字典格式上传 """ url = f"{INTERFACE_PATH}/api/data/" headers = {'api-token':self.info.get_api_token()} errors = check_config(config_path=config_path, qd=self.qd) if errors: print(errors) return False files = { 'cj': open(config_path, 'rb'), 'md': open(md_instruction_path, 'rb') } response = requests.post(url=url, headers=headers, files=files) if response.status_code !=200: print(f"错误,错误代码为:{response.status_code}") return False content = response.json() if content['status']!=200: print(content.get('error')) return False print("上传成功") return content['data']
[docs] def delete(self,data_id_or_name:str): data_id = self._get_data_id(data_id_or_name=data_id_or_name) if not data_id: return False url = INTERFACE_PATH + "/api/data/" headers = {'api-token':self.info.get_api_token()} response = requests.delete(url=url,headers=headers) if response.status_code != 204: print(f"删除时出现错误,错误代码{response.status_code}") return False content = response.json() if content.get("status") != 200: print(content.get('error')) return False print(content.get('msg')) return None # 成功删除
[docs] def find_path(self): if not self.data_id: print("data_id为空") return 0 path = os.path.join(CACHE_PATH, self.data_id) print(f"当前加载的代码所在路径为:{path}") return path