Project

General

Profile

Feature #18020

Updated by ioquatix (Samuel Williams) almost 3 years ago

After continuing to build out the fiber scheduler interface and the specific hooks required for `io_uring`, I found some trouble within the implementation of `IO`. 

 I found that in some cases, we need to read into the `rb_io_buffer_t` struct directly. I tried creating a "fake string" in order to transit back into the Ruby fiber scheduler interface and this did work, but I was told we cannot expose fake string to Ruby code. 

 So, after this, and many other frustrations with using `String` as a IO buffer, I decided to implement a low level `IO::Buffer` based on my needs for high performance IO, and as part of the fiber scheduler interface. Going forward, this can form the basis of newer interfaces like `IO::Buffer#splice` and so on. We can also add support for `IO#read(n, buffer)` rather than string. This avoids many encoding and alignment issues. 

 While I'm less interested in the user facing interface at this time, I believe we can introduce it incrementally. Initially my focus is on the interface requirements for the fiber scheduler. Then, I'll look at how we can integrate it more into `IO` directly. The goal is to have this in place for Ruby 3.1. 

 ## Proposed Solution 

 We introduce new class `IO::Buffer`. 

 ```ruby 
 class IO::Buffer 
   # @returns [IO::Buffer] A buffer with the contents of the string data. 
   def self.for(string) 
   end 

   PAGE_SIZE = # ... operating system page size 

   # @returns [IO::Buffer] A buffer with the contents of the file mapped to memory. 
   def self.map(file) 
   end 

   # Flags for buffer state. 
   EXTERNAL = # The buffer is from external memory. 
   INTERNAL = # The buffer is from internal memory (malloc). 
   MAPPED = # The buffer is from mapped memory (mmap, VirtualAlloc, etc) 
   LOCKED = # The buffer is locked for usage (cannot be resized) 
   PRIVATE = # The buffer is mapped as copy-on-write. 
   IMMUTABLE = # The buffer cannot be modified. 

   # @returns [IO::Buffer] A buffer with the specified size, allocated according to the given flags. 
   def initialize(size, flags) 
   end 

   # @returns [Integral] The size of the buffer 
   attr :size 

   # @returns [String] A brief summary and hex dump of the buffer. 
   def inspect 
   end 

   # @returns [String] A brief summary of the buffer. 
   def to_s 
   end 

   # Flag predicates: 
   def external? 
   end 

   def internal? 
   end 

   def mapped? 
   end 

   def locked? 
   end 

   def immutable? 
   end 

   # Flags for endian/byte order: 
   LITTLE_ENDIAN = # ... 
   BIG_ENDIAN = # ... 
   HOST_ENDIAN = # ... 
   NETWORK_ENDIAN= # ... 

   # Lock the buffer (prevent resize, unmap, changes to base and size). 
   def lock 
     raise "Already locked!" if flags & LOCKED 
    
     flags |= LOCKED 
   end 

   # Unlock the buffer. 
   def unlock 
     raise "Not locked!" unless flags & LOCKED 
    
     flags |= ~LOCKED 
   end 

   // Manipulation: 
   # @returns [IO::Buffer] A slice of the buffer's data. Does not copy. 
   def slice(offset, length) 
   end 

   # @returns [String] A binary string starting at offset, length bytes. 
   def to_str(offset, length) 
   end 

   # Copy the specified string into the buffer at the given offset. 
   def copy(string, offset) 
   end 

   # Compare two buffers. 
   def <=>(other) 
   end 

   include Comparable 

   # Resize the buffer, preserving the given length (if non-zero). 
   def resize(size, preserve = 0) 
   end 

   # Clear the buffer to the specified value. 
   def clear(value = 0, offset = 0, length = (@size - offset)) 
   end 

   # Data Types: 
   # Lower case: little endian. 
   # Upper case: big endian (network endian). 
   # 
   # :U8          | unsigned 8-bit integer. 
   # :S8          | signed 8-bit integer. 
   # 
   # :u16, :U16 | unsigned 16-bit integer. 
   # :s16, :S16 | signed 16-bit integer. 
   # 
   # :u32, :U32 | unsigned 32-bit integer. 
   # :s32, :S32 | signed 32-bit integer. 
   # 
   # :u64, :U64 | unsigned 64-bit integer. 
   # :s64, :S64 | signed 64-bit integer. 
   # 
   # :f32, :F32 | 32-bit floating point number. 
   # :f64, :F64 | 64-bit floating point number. 

   # Get the given data type at the specified offset. 
   def get(type, offset) 
   end 

   # Set the given value as the specified data type at the specified offset. 
   def set(type, offset, value) 
   end 
 end 
 ``` 

 The C interface provides a few convenient methods for accessing the underlying data buffer: 

 ```c 
 void rb_io_buffer_get_mutable(VALUE self, void **base, size_t *size); 
 void rb_io_buffer_get_immutable(VALUE self, const void **base, size_t *size); 
 ``` 

 In the fiber scheduler, it is used like this: 

 ```c 
 VALUE 
 rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length) 
 { 
     VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); 

     VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length); 

     rb_io_buffer_free(buffer); 

     return result; 
 } 
 ``` 

 This function is invoked from `io.c` at various places to fill the buffer. We specifically the `(base, size)` tuple, along with `length` which is the *minimum* length required and assists with efficient non-blocking implementation. 

 The `uring.c` implementation in the event gem uses this interface like so: 

 ```c 
 VALUE Event_Backend_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _length) { 
	 struct Event_Backend_URing *data = NULL; 
	 TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data); 
	
	 int descriptor = RB_NUM2INT(rb_funcall(io, id_fileno, 0)); 
	
	 void *base; 
	 size_t size; 
	 rb_io_buffer_get_mutable(buffer, &base, &size); 
	
	 size_t offset = 0; 
	 size_t length = NUM2SIZET(_length); 
	
	 while (length > 0) { 
		 size_t maximum_size = size - offset; 
		 int result = io_read(data, fiber, descriptor, (char*)base+offset, maximum_size); 
		
		 if (result == 0) { 
			 break; 
		 } else if (result > 0) { 
			 offset += result; 
			 if ((size_t)result > length) break; 
			 length -= result; 
		 } else if (-result == EAGAIN || -result == EWOULDBLOCK) { 
			 Event_Backend_URing_io_wait(self, fiber, io, RB_INT2NUM(READABLE)); 
		 } else { 
			 rb_syserr_fail(-result, strerror(-result)); 
		 } 
	 } 
	
	 return SIZET2NUM(offset); 
 } 
 ```

Back