""" Common helpers for working with ggml + numpy """ from ggml import ffi, lib from typing import Union, Optional import numpy as np def init(mem_size: int, mem_buffer: ffi.CData = ffi.NULL, no_alloc: bool = False) -> ffi.CData: """ Initialize a ggml context, which will be freed automatically when the pointer is garbage collected. """ params = ffi.new('struct ggml_init_params*') params.mem_size = mem_size params.mem_buffer = mem_buffer params.no_alloc = no_alloc return ffi.gc(lib.ggml_init(params[0]), lib.ggml_free) TensorLike = Union[ffi.CData, np.ndarray] def copy(from_tensor: TensorLike, to_tensor: TensorLike, allow_requantize: bool = True): """ Copy the contents of one tensor to another, doing any necessary (de/re)quantization transparently. Works across numpy & ggml tensors, but they must have the same shape (and be contiguous). Parameters ---------- from_tensor : TensorLike The tensor to copy from (a numpy array or possibly-quantized ggml tensor) to_tensor : TensorLike The tensor to copy to (a numpy array or possibly-quantized ggml tensor) allow_requantize : bool If False, will throw an error if requantization is required (i.e. both from_tensor and to_tensor are quantized with different quantization types) """ if id(from_tensor) == id(to_tensor): return __expect_same_layout("source", from_tensor, "destination", to_tensor) __check_shape_consistent_with_type(from_tensor) __check_shape_consistent_with_type(to_tensor) from_type = __get_type(from_tensor) to_type = __get_type(to_tensor) if from_type == to_type: ffi.memmove(__get_data(to_tensor), __get_data(from_tensor), __get_nbytes(from_tensor)) else: assert allow_requantize or not lib.ggml_is_quantized(from_type) or not lib.ggml_is_quantized(to_type), \ f"Requantizing from {__type_name(from_type)} to {__type_name(to_type)} is disabled. Force with allow_requantize=True" __set_floats(to_tensor, __get_floats(from_tensor)) def numpy(tensor: ffi.CData, allow_copy: Union[bool, np.ndarray] = False, allow_requantize=False) -> np.ndarray: """ Convert a ggml tensor to a numpy array. If the tensor isn't quantized, the returned numpy array will be a view over its data. If it is quantized (and allow_copy is True), the copy will involve dequantization and the returned array will be a copy of the original tensor (any changes to the numpy array won't then be reflected back to the tensor). Parameters ---------- tensor : ffi.CData The tensor to convert to a numpy array allow_copy : bool or np.ndarray If False, will throw an error if the tensor is quantized (since dequantization requires extra memory). If True, will dequantize the tensor and return a copy of the data in a new float32 numpy array. If an np.ndarray, will copy the data into the given array (which must be the same shape as the tensor) when dequantization is needed allow_requantize : bool If allow_copy is a tensor with a different quantization type than the source tensor, will throw an error unless allow_requantize is True. """ shape = __get_shape(tensor) if lib.ggml_is_quantized(tensor.type): if allow_copy == False: raise ValueError(f"{__describe(tensor)} is quantized, conversion to numpy requires a copy (pass allow_copy=True; changes to the numpy array won't affect the original).") elif isinstance(allow_copy, np.ndarray): __expect_same_layout("source tensor", tensor, "dequantization output tensor", allow_copy) destination = allow_copy else: destination = np.empty(shape, dtype=np.float32) copy(tensor, destination, allow_requantize=allow_requantize) return destination else: dtype = __type_to_dtype(tensor.type) if not dtype: raise NotImplementedError(f'Cannot convert {__describe(tensor)} to numpy') assert __is_contiguous(tensor), f"Cannot convert {__describe(tensor)} to numpy (support contiguous tensors only)" nbytes = lib.ggml_nelements(tensor) * lib.ggml_type_size(tensor.type) array = np.frombuffer(ffi.buffer(lib.ggml_get_data(tensor), nbytes), dtype=dtype) array.shape = shape return array def __type_name(type: int) -> str: name = lib.ggml_type_name(type) return ffi.string(name).decode('utf-8') if name else None __k_quant_types = set([ lib.GGML_TYPE_Q2_K, lib.GGML_TYPE_Q3_K, lib.GGML_TYPE_Q4_K, lib.GGML_TYPE_Q5_K, lib.GGML_TYPE_Q6_K, lib.GGML_TYPE_Q8_K, ]) __type_to_dtype_dict = { lib.GGML_TYPE_I8: np.int8, lib.GGML_TYPE_I16: np.int16, lib.GGML_TYPE_I32: np.int32, lib.GGML_TYPE_F16: np.float16, lib.GGML_TYPE_F32: np.float32, } def __type_to_dtype(type: int) -> Optional[np.dtype]: return __type_to_dtype_dict.get(type) def __dtype_to_type(dtype: np.dtype): if dtype == np.float32: return lib.GGML_TYPE_F32 elif dtype == np.float16: return lib.GGML_TYPE_F16 elif dtype == np.int32: return lib.GGML_TYPE_I32 elif dtype == np.int16: return lib.GGML_TYPE_I16 elif dtype == np.int8: return lib.GGML_TYPE_I8 else: raise ValueError(f"Unsupported dtype: {dtype}") def __describe(tensor: ffi.CType): return f'Tensor[{__type_name(__get_type(tensor))}, {__get_shape(tensor)}]' def __get_type(tensor: TensorLike): return __dtype_to_type(tensor.dtype) if isinstance(tensor, np.ndarray) else tensor.type def __get_shape(x: TensorLike): return x.shape if isinstance(x, np.ndarray) else tuple([x.ne[i] for i in range(x.n_dims)]) def __get_strides(x: TensorLike): return x.strides if isinstance(x, np.ndarray) else tuple([x.nb[i] for i in range(x.n_dims)]) def __get_data(x: TensorLike) -> ffi.CData: return ffi.from_buffer(x) if isinstance(x, np.ndarray) else lib.ggml_get_data(x) def __get_nbytes(tensor: TensorLike): return tensor.nbytes if isinstance(tensor, np.ndarray) else lib.ggml_nbytes(tensor) def __get_nelements(tensor: TensorLike): return tensor.size if isinstance(tensor, np.ndarray) else lib.ggml_nelements(tensor) def __is_contiguous(tensor: TensorLike): return tensor.flags['C_CONTIGUOUS'] if isinstance(tensor, np.ndarray) else lib.ggml_is_contiguous(tensor) def __get_floats(tensor: TensorLike) -> ffi.CData: data, type = __get_data(tensor), __get_type(tensor) if type == lib.GGML_TYPE_F32: return ffi.cast('float*', data) else: nelements = __get_nelements(tensor) floats = ffi.new('float[]', nelements) if type == lib.GGML_TYPE_F16: lib.ggml_fp16_to_fp32_row(ffi.cast('uint16_t*', data), floats, nelements) elif lib.ggml_is_quantized(type): qtype = lib.ggml_internal_get_type_traits(type) assert qtype.to_float, f"Type {__type_name(type)} is not supported by ggml" qtype.to_float(data, floats, nelements) else: raise NotImplementedError(f'Cannot read floats from {__describe(tensor)}') return floats def __set_floats(tensor: TensorLike, f32_data: ffi.CData) -> None: data, type, nbytes = __get_data(tensor), __get_type(tensor), __get_nbytes(tensor) if type == lib.GGML_TYPE_F32: ffi.memmove(data, f32_data, nbytes) else: nelements = __get_nelements(tensor) if type == lib.GGML_TYPE_F16: lib.ggml_fp32_to_fp16_row(f32_data, ffi.cast('uint16_t*', data), nelements) elif lib.ggml_is_quantized(type): qtype = lib.ggml_internal_get_type_traits(type) assert qtype.from_float, f"Type {__type_name(type)} is not supported by ggml" qtype.from_float(f32_data, data, nelements) else: raise NotImplementedError(f'Cannot write floats to {__describe(tensor)}') def __expect_same_layout(name1: str, tensor1: TensorLike, name2: str, tensor2: TensorLike): shape1, shape2 = __get_shape(tensor1), __get_shape(tensor2) assert shape1 == shape2, f"Shape mismatch: {name1} has {shape1} but {name2} has {shape2}" assert __is_contiguous(tensor1) and __is_contiguous(tensor2), f"Only contiguous tensors are supported (got {name1} with strides {__get_strides(tensor1)} and {name2} with strides {__get_strides(tensor2)})" def __check_shape_consistent_with_type(tensor: TensorLike): type = __get_type(tensor) if not lib.ggml_is_quantized(type): return shape = __get_shape(tensor) block_size = lib.ggml_blck_size(type) assert not (block_size == 0 and type in __k_quant_types), f"Can't quantize, native library was not compiled with USE_K_QUANTS!" assert block_size > 0, f"Invalid block size {block_size} for type {__type_name(type)}" for i, d in enumerate(shape): assert d % block_size == 0, f"Dimension {i} of {__describe(tensor)} is not divisible by {block_size}, required for quantization."