123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- """
- Common helpers for working with ggml + numpy
- """
- from ggml import ffi, lib
- from typing import Union, Optional
- import numpy as np
- def init(mem_size: int, mem_buffer: ffi.CData = ffi.NULL, no_alloc: bool = False) -> ffi.CData:
- """
- Initialize a ggml context, which will be freed automatically when the pointer is garbage collected.
- """
- params = ffi.new('struct ggml_init_params*')
- params.mem_size = mem_size
- params.mem_buffer = mem_buffer
- params.no_alloc = no_alloc
- return ffi.gc(lib.ggml_init(params[0]), lib.ggml_free)
-
- TensorLike = Union[ffi.CData, np.ndarray]
- def copy(from_tensor: TensorLike, to_tensor: TensorLike, allow_requantize: bool = True):
- """
- Copy the contents of one tensor to another, doing any necessary (de/re)quantization transparently.
- Works across numpy & ggml tensors, but they must have the same shape (and be contiguous).
- Parameters
- ----------
- from_tensor : TensorLike
- The tensor to copy from (a numpy array or possibly-quantized ggml tensor)
- to_tensor : TensorLike
- The tensor to copy to (a numpy array or possibly-quantized ggml tensor)
- allow_requantize : bool
- If False, will throw an error if requantization is required (i.e. both from_tensor
- and to_tensor are quantized with different quantization types)
- """
- if id(from_tensor) == id(to_tensor):
- return
-
- __expect_same_layout("source", from_tensor, "destination", to_tensor)
- __check_shape_consistent_with_type(from_tensor)
- __check_shape_consistent_with_type(to_tensor)
- from_type = __get_type(from_tensor)
- to_type = __get_type(to_tensor)
- if from_type == to_type:
- ffi.memmove(__get_data(to_tensor), __get_data(from_tensor), __get_nbytes(from_tensor))
- else:
- assert allow_requantize or not lib.ggml_is_quantized(from_type) or not lib.ggml_is_quantized(to_type), \
- f"Requantizing from {__type_name(from_type)} to {__type_name(to_type)} is disabled. Force with allow_requantize=True"
-
- __set_floats(to_tensor, __get_floats(from_tensor))
- def numpy(tensor: ffi.CData, allow_copy: Union[bool, np.ndarray] = False, allow_requantize=False) -> np.ndarray:
- """
- Convert a ggml tensor to a numpy array.
- If the tensor isn't quantized, the returned numpy array will be a view over its data.
-
- If it is quantized (and allow_copy is True), the copy will involve dequantization and the returned array will
- be a copy of the original tensor (any changes to the numpy array won't then be reflected back to the tensor).
- Parameters
- ----------
- tensor : ffi.CData
- The tensor to convert to a numpy array
- allow_copy : bool or np.ndarray
- If False, will throw an error if the tensor is quantized (since dequantization requires extra memory).
- If True, will dequantize the tensor and return a copy of the data in a new float32 numpy array.
- If an np.ndarray, will copy the data into the given array (which must be the same shape as the tensor) when dequantization is needed
- allow_requantize : bool
- If allow_copy is a tensor with a different quantization type than the source tensor, will throw an error unless allow_requantize is True.
- """
- shape = __get_shape(tensor)
- if lib.ggml_is_quantized(tensor.type):
- if allow_copy == False:
- raise ValueError(f"{__describe(tensor)} is quantized, conversion to numpy requires a copy (pass allow_copy=True; changes to the numpy array won't affect the original).")
- elif isinstance(allow_copy, np.ndarray):
- __expect_same_layout("source tensor", tensor, "dequantization output tensor", allow_copy)
- destination = allow_copy
- else:
- destination = np.empty(shape, dtype=np.float32)
- copy(tensor, destination, allow_requantize=allow_requantize)
- return destination
- else:
- dtype = __type_to_dtype(tensor.type)
- if not dtype:
- raise NotImplementedError(f'Cannot convert {__describe(tensor)} to numpy')
- assert __is_contiguous(tensor), f"Cannot convert {__describe(tensor)} to numpy (support contiguous tensors only)"
- nbytes = lib.ggml_nelements(tensor) * lib.ggml_type_size(tensor.type)
- array = np.frombuffer(ffi.buffer(lib.ggml_get_data(tensor), nbytes), dtype=dtype)
- array.shape = shape
- return array
- def __type_name(type: int) -> str:
- name = lib.ggml_type_name(type)
- return ffi.string(name).decode('utf-8') if name else None
- __k_quant_types = set([
- lib.GGML_TYPE_Q2_K,
- lib.GGML_TYPE_Q3_K,
- lib.GGML_TYPE_Q4_K,
- lib.GGML_TYPE_Q5_K,
- lib.GGML_TYPE_Q6_K,
- lib.GGML_TYPE_Q8_K,
- ])
- __type_to_dtype_dict = {
- lib.GGML_TYPE_I8: np.int8,
- lib.GGML_TYPE_I16: np.int16,
- lib.GGML_TYPE_I32: np.int32,
- lib.GGML_TYPE_F16: np.float16,
- lib.GGML_TYPE_F32: np.float32,
- }
- def __type_to_dtype(type: int) -> Optional[np.dtype]: return __type_to_dtype_dict.get(type)
- def __dtype_to_type(dtype: np.dtype):
- if dtype == np.float32: return lib.GGML_TYPE_F32
- elif dtype == np.float16: return lib.GGML_TYPE_F16
- elif dtype == np.int32: return lib.GGML_TYPE_I32
- elif dtype == np.int16: return lib.GGML_TYPE_I16
- elif dtype == np.int8: return lib.GGML_TYPE_I8
- else: raise ValueError(f"Unsupported dtype: {dtype}")
- def __describe(tensor: ffi.CType): return f'Tensor[{__type_name(__get_type(tensor))}, {__get_shape(tensor)}]'
- def __get_type(tensor: TensorLike): return __dtype_to_type(tensor.dtype) if isinstance(tensor, np.ndarray) else tensor.type
- def __get_shape(x: TensorLike): return x.shape if isinstance(x, np.ndarray) else tuple([x.ne[i] for i in range(x.n_dims)])
- def __get_strides(x: TensorLike): return x.strides if isinstance(x, np.ndarray) else tuple([x.nb[i] for i in range(x.n_dims)])
- def __get_data(x: TensorLike) -> ffi.CData: return ffi.from_buffer(x) if isinstance(x, np.ndarray) else lib.ggml_get_data(x)
- def __get_nbytes(tensor: TensorLike): return tensor.nbytes if isinstance(tensor, np.ndarray) else lib.ggml_nbytes(tensor)
- def __get_nelements(tensor: TensorLike): return tensor.size if isinstance(tensor, np.ndarray) else lib.ggml_nelements(tensor)
- def __is_contiguous(tensor: TensorLike): return tensor.flags['C_CONTIGUOUS'] if isinstance(tensor, np.ndarray) else lib.ggml_is_contiguous(tensor)
- def __get_floats(tensor: TensorLike) -> ffi.CData:
- data, type = __get_data(tensor), __get_type(tensor)
- if type == lib.GGML_TYPE_F32:
- return ffi.cast('float*', data)
- else:
- nelements = __get_nelements(tensor)
- floats = ffi.new('float[]', nelements)
- if type == lib.GGML_TYPE_F16:
- lib.ggml_fp16_to_fp32_row(ffi.cast('uint16_t*', data), floats, nelements)
- elif lib.ggml_is_quantized(type):
- qtype = lib.ggml_internal_get_type_traits(type)
- assert qtype.to_float, f"Type {__type_name(type)} is not supported by ggml"
- qtype.to_float(data, floats, nelements)
- else:
- raise NotImplementedError(f'Cannot read floats from {__describe(tensor)}')
- return floats
- def __set_floats(tensor: TensorLike, f32_data: ffi.CData) -> None:
- data, type, nbytes = __get_data(tensor), __get_type(tensor), __get_nbytes(tensor)
- if type == lib.GGML_TYPE_F32:
- ffi.memmove(data, f32_data, nbytes)
- else:
- nelements = __get_nelements(tensor)
- if type == lib.GGML_TYPE_F16:
- lib.ggml_fp32_to_fp16_row(f32_data, ffi.cast('uint16_t*', data), nelements)
- elif lib.ggml_is_quantized(type):
- qtype = lib.ggml_internal_get_type_traits(type)
- assert qtype.from_float, f"Type {__type_name(type)} is not supported by ggml"
- qtype.from_float(f32_data, data, nelements)
- else:
- raise NotImplementedError(f'Cannot write floats to {__describe(tensor)}')
- def __expect_same_layout(name1: str, tensor1: TensorLike, name2: str, tensor2: TensorLike):
- shape1, shape2 = __get_shape(tensor1), __get_shape(tensor2)
- assert shape1 == shape2, f"Shape mismatch: {name1} has {shape1} but {name2} has {shape2}"
- assert __is_contiguous(tensor1) and __is_contiguous(tensor2), f"Only contiguous tensors are supported (got {name1} with strides {__get_strides(tensor1)} and {name2} with strides {__get_strides(tensor2)})"
- def __check_shape_consistent_with_type(tensor: TensorLike):
- type = __get_type(tensor)
- if not lib.ggml_is_quantized(type):
- return
- shape = __get_shape(tensor)
- block_size = lib.ggml_blck_size(type)
- assert not (block_size == 0 and type in __k_quant_types), f"Can't quantize, native library was not compiled with USE_K_QUANTS!"
- assert block_size > 0, f"Invalid block size {block_size} for type {__type_name(type)}"
- for i, d in enumerate(shape):
- assert d % block_size == 0, f"Dimension {i} of {__describe(tensor)} is not divisible by {block_size}, required for quantization."
|