Project

General

Profile

Feature #18020

Updated by ioquatix (Samuel Williams) over 2 years ago

After continuing to build out the fiber scheduler interface and the specific hooks required for `io_uring`, I found some trouble within the implementation of `IO`. 

 I found that in some cases, we need to read into the internal IO buffers `rb_io_buffer_t` struct directly. I tried creating a "fake string" in order to transit back into the Ruby fiber scheduler interface and this did work to a certain extent, work, but I was told we cannot expose fake string to Ruby scheduler interface. code. 

 So, after this, and many other frustrations with using `String` as a IO buffer, I decided to implement a low level `IO::Buffer` based on my needs for high performance IO, and as part of the fiber scheduler interface. Going forward, this can form the basis of newer interfaces like `IO::Buffer#splice` and so on. We can also add support for `IO#read(n, buffer)` rather than string. This avoids many encoding and alignment issues. 

 Here is roughly While I'm less interested in the user facing interface implemented by the scheduler w.r.t. the buffer: 

 ```ruby 
 class Scheduler 
   # @parameter buffer [IO::Buffer] Buffer for reading into. 
   def io_read(io, buffer, length) 
     # implementation provided by `read` system call, IO_URING_READV, etc. 
   end 

   # @parameter buffer [IO::Buffer] Buffer for writing from. 
   def io_write(io, buffer, length) 
     # implementation provided by `write` system call, IO_URING_WRITEV, etc. 
   end 

   # Potential new hooks (Socket#recvmsg, sendmsg, etc): 
   def io_recvmsg(io, buffer, length) 
   end 
 end 
 ``` 

 In reviewing other language designs, at this time, I found that this design believe we can introduce it incrementally. Initially my focus is very similar to Crystal's IO buffering strategy. 

 The proposed implementation provides enough of an on the interface to implement both native schedulers as well as pure Ruby schedulers. It also provides some extra functionality requirements for interpreting the data in the buffer. This fiber scheduler. Then, I'll look at how we can integrate it more into `IO` directly. The goal is mostly for testing and experimentation, although it might make sense to expose have this interface in place for binary protocols like HTTP/2, QUIC, WebSockets, etc. Ruby 3.1. 

 ## Proposed Solution 

 We introduce new class `IO::Buffer`. 

 ```ruby 
 class IO::Buffer 
   # @returns [IO::Buffer] A buffer with the contents of the string data. 
   def self.for(string) 
   end 

   PAGE_SIZE = # ... operating system page size 

   # @returns [IO::Buffer] A buffer with the contents of the file mapped to memory. 
   def self.map(file) 
   end 

   # Flags for buffer state. 
   EXTERNAL = # The buffer is from external memory. 
   INTERNAL = # The buffer is from internal memory (malloc). 
   MAPPED = # The buffer is from mapped memory (mmap, VirtualAlloc, etc) 
   LOCKED = # The buffer is locked for usage (cannot be resized) 
   PRIVATE = # The buffer is mapped as copy-on-write. 
   IMMUTABLE = # The buffer cannot be modified. 

   # @returns [IO::Buffer] A buffer with the specified size, allocated according to the given flags. 
   def initialize(size, flags) 
   end 

   # @returns [Integral] The size of the buffer 
   attr :size 

   # @returns [String] A brief summary and hex dump of the buffer. 
   def inspect 
   end 

   # @returns [String] A brief summary of the buffer. 
   def to_s 
   end 

   # Flag predicates: 
   def external? 
   end 

   def internal? 
   end 

   def mapped? 
   end 

   def locked? 
   end 

   def immutable? 
   end 

   # Flags for endian/byte order: 
   LITTLE_ENDIAN = # ... 
   BIG_ENDIAN = # ... 
   HOST_ENDIAN = # ... 
   NETWORK_ENDIAN= # ... 

   # Lock the buffer (prevent resize, unmap, changes to base and size). 
   def lock 
     raise "Already locked!" if flags & LOCKED 
    
     flags |= LOCKED 
   end 

   # Unlock the buffer. 
   def unlock 
     raise "Not locked!" unless flags & LOCKED 
    
     flags |= ~LOCKED 
   end 

   // Manipulation: 
   # @returns [IO::Buffer] A slice of the buffer's data. Does not copy. 
   def slice(offset, length) 
   end 

   # @returns [String] A binary string starting at offset, length bytes. 
   def to_str(offset, length) 
   end 

   # Copy the specified string into the buffer at the given offset. 
   def copy(string, offset) 
   end 

   # Compare two buffers. 
   def <=>(other) 
   end 

   include Comparable 

   # Resize the buffer, preserving the given length (if non-zero). 
   def resize(size, preserve = 0) 
   end 

   # Clear the buffer to the specified value. 
   def clear(value = 0, offset = 0, length = (@size - offset)) 
   end 

   # Data Types: 
   # Lower case: little endian. 
   # Upper case: big endian (network endian). 
   # 
   # :U8          | unsigned 8-bit integer. 
   # :S8          | signed 8-bit integer. 
   # 
   # :u16, :U16 | unsigned 16-bit integer. 
   # :s16, :S16 | signed 16-bit integer. 
   # 
   # :u32, :U32 | unsigned 32-bit integer. 
   # :s32, :S32 | signed 32-bit integer. 
   # 
   # :u64, :U64 | unsigned 64-bit integer. 
   # :s64, :S64 | signed 64-bit integer. 
   # 
   # :f32, :F32 | 32-bit floating point number. 
   # :f64, :F64 | 64-bit floating point number. 

   # Get the given data type at the specified offset. 
   def get(type, offset) 
   end 

   # Set the given value as the specified data type at the specified offset. 
   def set(type, offset, value) 
   end 
 end 
 ``` 

 The C interface provides a few convenient methods for accessing the underlying data buffer: 

 ```c 
 void rb_io_buffer_get_mutable(VALUE self, void **base, size_t *size); 
 void rb_io_buffer_get_immutable(VALUE self, const void **base, size_t *size); 
 ``` 

 In the fiber scheduler, it is used like this: 

 ```c 
 VALUE 
 rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length) 
 { 
     VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); 

     VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length); 

     rb_io_buffer_free(buffer); 

     return result; 
 } 
 ``` 

 This function is invoked from `io.c` at various places to fill the buffer. We specifically the `(base, size)` tuple, along with `length` which is the *minimum* length required and assists with efficient non-blocking implementation. 

 The `uring.c` implementation in the event gem uses this interface like so: 

 ```c 
 VALUE Event_Backend_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _length) { 
	 struct Event_Backend_URing *data = NULL; 
	 TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data); 
	
	 int descriptor = RB_NUM2INT(rb_funcall(io, id_fileno, 0)); 
	
	 void *base; 
	 size_t size; 
	 rb_io_buffer_get_mutable(buffer, &base, &size); 
	
	 size_t offset = 0; 
	 size_t length = NUM2SIZET(_length); 
	
	 while (length > 0) { 
		 size_t maximum_size = size - offset; 
		 int result = io_read(data, fiber, descriptor, (char*)base+offset, maximum_size); 
		
		 if (result == 0) { 
			 break; 
		 } else if (result > 0) { 
			 offset += result; 
			 if ((size_t)result > length) break; 
			 length -= result; 
		 } else if (-result == EAGAIN || -result == EWOULDBLOCK) { 
			 Event_Backend_URing_io_wait(self, fiber, io, RB_INT2NUM(READABLE)); 
		 } else { 
			 rb_syserr_fail(-result, strerror(-result)); 
		 } 
	 } 
	
	 return SIZET2NUM(offset); 
 } 
 ```

Back